serverTimestampMap = table.readRegionServerLastLogRollResult();
+ String host = rss.getServerName().getHostname();
+ String sts = serverTimestampMap.get(host);
+ if (sts != null && Long.parseLong(sts) > filenum) {
+ LOG.warn("Won't update server's last roll log result: current=" + sts + " new=" + filenum);
+ return null;
+ }
+ table.writeRegionServerLastLogRollResult(host, Long.toString(filenum));
+ // TODO: potential leak of HBase connection
+ // BackupSystemTable.close();
+ return null;
+ }
+
+ }
+
+ private void rolllog() throws ForeignException {
+
+ monitor.rethrowException();
+
+ taskManager.submitTask(new RSRollLogTask());
+ monitor.rethrowException();
+
+ // wait for everything to complete.
+ taskManager.waitForOutstandingTasks();
+ monitor.rethrowException();
+
+ }
+
+ @Override
+ public void acquireBarrier() throws ForeignException {
+ // do nothing, executing in inside barrier step.
+ }
+
+ /**
+ * do a log roll.
+ * @return some bytes
+ */
+ @Override
+ public byte[] insideBarrier() throws ForeignException {
+ rolllog();
+ // FIXME
+ return null;
+ }
+
+ /**
+ * Cancel threads if they haven't finished.
+ */
+ @Override
+ public void cleanup(Exception e) {
+ taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
+ }
+
+ /**
+ * Hooray!
+ */
+ public void releaseBarrier() {
+ // NO OP
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/RegionServerBackupManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/RegionServerBackupManager.java
new file mode 100644
index 0000000..debae80
--- /dev/null
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/backup/regionserver/RegionServerBackupManager.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.backup.regionserver;
+
+
+import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
+import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
+import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
+import org.apache.hadoop.hbase.procedure.ProcedureMember;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
+import org.apache.hadoop.hbase.procedure.Subprocedure;
+import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+/**
+ * This manager class handles the work dealing with backup for a {@link HRegionServer}.
+ *
+ * This provides the mechanism necessary to kick off a backup specific {@link Subprocedure} that is
+ * responsible by this region server. If any failures occur with the subprocedure, the manager's
+ * procedure member notifies the procedure coordinator to abort all others.
+ *
+ * On startup, requires {@link #start()} to be called.
+ *
+ * On shutdown, requires org.apache.hadoop.hbase.procedure.ProcedureMember.close() to be
+ * called
+ */
+public class RegionServerBackupManager extends RegionServerProcedureManager {
+
+ private static final Log LOG = LogFactory.getLog(RegionServerBackupManager.class);
+
+ /** Conf key for number of request threads to start backup on regionservers */
+ public static final String BACKUP_REQUEST_THREADS_KEY = "hbase.backup.region.pool.threads";
+ /** # of threads for backup work on the rs. */
+ public static final int BACKUP_REQUEST_THREADS_DEFAULT = 10;
+
+ public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.timeout";
+ public static final long BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+ /** Conf key for millis between checks to see if backup work completed or if there are errors */
+ public static final String BACKUP_REQUEST_WAKE_MILLIS_KEY = "hbase.backup.region.wakefrequency";
+ /** Default amount of time to check for errors while regions finish backup work */
+ private static final long BACKUP_REQUEST_WAKE_MILLIS_DEFAULT = 500;
+
+ private RegionServerServices rss;
+ private ProcedureMemberRpcs memberRpcs;
+ private ProcedureMember member;
+
+ /**
+ * Create a default backup procedure manager
+ */
+ public RegionServerBackupManager() {
+ }
+
+ /**
+ * Start accepting backup procedure requests.
+ */
+ @Override
+ public void start() {
+ this.memberRpcs.start(rss.getServerName().toString(), member);
+ LOG.info("Started region server backup manager.");
+ }
+
+ /**
+ * Close this and all running backup procedure tasks
+ * @param force forcefully stop all running tasks
+ * @throws IOException
+ */
+ @Override
+ public void stop(boolean force) throws IOException {
+ String mode = force ? "abruptly" : "gracefully";
+ LOG.info("Stopping RegionServerBackupManager " + mode + ".");
+
+ try {
+ this.member.close();
+ } finally {
+ this.memberRpcs.close();
+ }
+ }
+
+ /**
+ * If in a running state, creates the specified subprocedure for handling a backup procedure.
+ * @return Subprocedure to submit to the ProcedureMemeber.
+ */
+ public Subprocedure buildSubprocedure() {
+
+ // don't run a backup if the parent is stop(ping)
+ if (rss.isStopping() || rss.isStopped()) {
+ throw new IllegalStateException("Can't start backup procedure on RS: " + rss.getServerName()
+ + ", because stopping/stopped!");
+ }
+
+ LOG.info("Attempting to run a roll log procedure for backup.");
+ ForeignExceptionDispatcher errorDispatcher = new ForeignExceptionDispatcher();
+ Configuration conf = rss.getConfiguration();
+ long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ long wakeMillis =
+ conf.getLong(BACKUP_REQUEST_WAKE_MILLIS_KEY, BACKUP_REQUEST_WAKE_MILLIS_DEFAULT);
+
+ BackupSubprocedurePool taskManager =
+ new BackupSubprocedurePool(rss.getServerName().toString(), conf);
+ return new LogRollBackupSubprocedure(rss, member, errorDispatcher, wakeMillis, timeoutMillis,
+ taskManager);
+
+ }
+
+ /**
+ * Build the actual backup procedure runner that will do all the 'hard' work
+ */
+ public class BackupSubprocedureBuilder implements SubprocedureFactory {
+
+ @Override
+ public Subprocedure buildSubprocedure(String name, byte[] data) {
+ return RegionServerBackupManager.this.buildSubprocedure();
+ }
+ }
+
+ @Override
+ public void initialize(RegionServerServices rss) throws IOException {
+ this.rss = rss;
+ BaseCoordinatedStateManager coordManager =
+ (BaseCoordinatedStateManager) CoordinatedStateManagerFactory.getCoordinatedStateManager(rss
+ .getConfiguration());
+ coordManager.initialize(rss);
+ this.memberRpcs =
+ coordManager
+ .getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);
+
+ // read in the backup handler configuration properties
+ Configuration conf = rss.getConfiguration();
+ long keepAlive = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY, BACKUP_TIMEOUT_MILLIS_DEFAULT);
+ int opThreads = conf.getInt(BACKUP_REQUEST_THREADS_KEY, BACKUP_REQUEST_THREADS_DEFAULT);
+ // create the actual cohort member
+ ThreadPoolExecutor pool =
+ ProcedureMember.defaultPool(rss.getServerName().toString(), opThreads, keepAlive);
+ this.member = new ProcedureMember(memberRpcs, pool, new BackupSubprocedureBuilder());
+ }
+
+ @Override
+ public String getProcedureSignature() {
+ return "backup-proc";
+ }
+
+}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
index ae36f08..ef10010 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/BaseCoordinatedStateManager.java
@@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Server;
@@ -55,4 +59,17 @@ public abstract class BaseCoordinatedStateManager implements CoordinatedStateMan
* Method to retrieve coordination for split log manager
*/
public abstract SplitLogManagerCoordination getSplitLogManagerCoordination();
+ /**
+ * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
+ */
+ public abstract ProcedureCoordinatorRpcs
+ getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException;
+
+ /**
+ * Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpc}
+ */
+ public abstract ProcedureMemberRpcs
+ getProcedureMemberRpcs(String procType) throws IOException;
+
+
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
index 3e89be7..a0e7f16 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java
@@ -17,9 +17,15 @@
*/
package org.apache.hadoop.hbase.coordination;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinatorRpcs;
+import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -54,4 +60,15 @@ public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
+
+ @Override
+ public ProcedureCoordinatorRpcs
+ getProcedureCoordinatorRpcs(String procType, String coordNode) throws IOException {
+ return new ZKProcedureCoordinatorRpcs(watcher, procType, coordNode);
+ }
+
+ @Override
+ public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws IOException {
+ return new ZKProcedureMemberRpcs(watcher, procType);
+ }
}
diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
index 9d9cee0..53151d7 100644
--- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java
@@ -85,6 +85,9 @@ public class WALPlayer extends Configured implements Tool {
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
+ public WALPlayer(){
+ }
+
protected WALPlayer(final Configuration c) {
super(c);
}
@@ -197,9 +200,7 @@ public class WALPlayer extends Configured implements Tool {
public void setup(Context context) throws IOException {
String[] tableMap = context.getConfiguration().getStrings(TABLE_MAP_KEY);
String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
- if (tablesToUse == null && tableMap == null) {
- // Then user wants all tables.
- } else if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
+ if (tablesToUse == null || tableMap == null || tablesToUse.length != tableMap.length) {
// this can only happen when WALMapper is used directly by a class other than WALPlayer
throw new IOException("No tables or incorrect table mapping specified.");
}
@@ -305,7 +306,8 @@ public class WALPlayer extends Configured implements Tool {
System.err.println("Usage: " + NAME + " [options] []");
System.err.println("Read all WAL entries for .");
System.err.println("If no tables (\"\") are specific, all tables are imported.");
- System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported in that case.)");
+ System.err.println("(Careful, even -ROOT- and hbase:meta entries will be imported"+
+ " in that case.)");
System.err.println("Otherwise is a comma separated list of tables.\n");
System.err.println("The WAL entries can be mapped to new set of tables via