diff --git hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java index 47e428c4c4..d4fb2a0677 100644 --- hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java +++ hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; @@ -50,11 +51,17 @@ import org.apache.zookeeper.KeeperException; */ @InterfaceAudience.Private public class LogRollMasterProcedureManager extends MasterProcedureManager { + private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; - private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); + public static final String BACKUP_WAKE_MILLIS_KEY = "hbase.backup.logroll.wake.millis"; + public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.logroll.timeout.millis"; + public static final String BACKUP_POOL_THREAD_NUMBER_KEY = "hbase.backup.logroll.pool.thread.number"; + public static final int BACKUP_WAKE_MILLIS_DEFAULT = 500; + public static final int BACKUP_TIMEOUT_MILLIS_DEFAULT = 60000; + public static final int BACKUP_POOL_THREAD_NUMBER_DEFAULT = 8; private MasterServices master; private ProcedureCoordinator coordinator; private boolean done; @@ -69,6 +76,26 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager { return false; } +// @Override +// public void initialize(MasterServices master, MetricsMaster metricsMaster) +// throws KeeperException, IOException, UnsupportedOperationException { +// this.master = master; +// this.done = false; +// +// // setup the default procedure coordinator +// String name = master.getServerName().toString(); +// ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); +// BaseCoordinatedStateManager coordManager = +// (BaseCoordinatedStateManager) CoordinatedStateManagerFactory +// .getCoordinatedStateManager(master.getConfiguration()); +// coordManager.initialize(master); +// +// ProcedureCoordinatorRpcs comms = +// coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); +// +// this.coordinator = new ProcedureCoordinator(comms, tpool); +// } + @Override public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException, IOException, UnsupportedOperationException { @@ -77,16 +104,25 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager { // setup the default procedure coordinator String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + + + // get the configuration for the coordinator + Configuration conf = master.getConfiguration(); + long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT); + long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY, + BACKUP_POOL_THREAD_NUMBER_DEFAULT); + + // setup the default procedure coordinator + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); BaseCoordinatedStateManager coordManager = (BaseCoordinatedStateManager) CoordinatedStateManagerFactory - .getCoordinatedStateManager(master.getConfiguration()); + .getCoordinatedStateManager(master.getConfiguration()); coordManager.initialize(master); - ProcedureCoordinatorRpcs comms = coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); + this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); - this.coordinator = new ProcedureCoordinator(comms, tpool); } @Override @@ -152,4 +188,4 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager { return done; } -} \ No newline at end of file +}