Index: bin/thread-pool.rb =================================================================== --- bin/thread-pool.rb (revision 0) +++ bin/thread-pool.rb (revision 0) @@ -0,0 +1,30 @@ +require 'thread' + +class ThreadPool + def initialize(poolsize) + @queue = Queue.new + @poolsize = poolsize + @pool = Array.new(@poolsize) do |i| + Thread.new do + Thread.current[:id] = i + catch(:close) do + loop do + job, args = @queue.pop + job.call(*args) + end + end + end + end + end + + def launch(*args, &block) + @queue << [block, args] + end + + def stop + @poolsize.times do + launch { throw :close } + end + @pool.map(&:join) + end +end Index: bin/region_mover.rb =================================================================== --- bin/region_mover.rb (revision 1587710) +++ bin/region_mover.rb (working copy) @@ -24,6 +24,7 @@ # Does not work for case of multiple regionservers all running on the # one node. require 'optparse' +require File.join(File.dirname(__FILE__), 'thread-pool') include Java import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.HBaseConfiguration @@ -160,6 +161,7 @@ # Now move it. Do it in a loop so can retry if fail. Have seen issue where # we tried move region but failed and retry put it back on old location; # retry in this case. + retries = admin.getConfiguration.getInt("hbase.move.retries.max", 5) count = 0 same = true @@ -345,20 +347,26 @@ break if rs.length == 0 count = 0 $LOG.info("Moving " + rs.length.to_s + " region(s) from " + servername + - " during this cycle"); - for r in rs - # Get a random server to move the region to. - server = servers[rand(servers.length)] - $LOG.info("Moving region " + r.getRegionNameAsString() + " (" + - count.to_s + " of " + rs.length.to_s + ") from server=" + - servername + " to server=" + server); - count = count + 1 - # Assert we can scan region in its current location - isSuccessfulScan(admin, r) - # Now move it. - move(admin, r, server, servername) - movedRegions.add(r) + " on " + servers.length.to_s + " servers using " + options[:maxthreads].to_s + " threads.") + counter = 0 + pool = ThreadPool.new(options[:maxthreads]) + server_index = 0 + while counter < rs.length do + pool.launch(rs,counter,server_index) do |_rs,_counter,_server_index| + $LOG.info("Moving region " + _rs[_counter].getEncodedName() + " (" + _counter.to_s + + " of " + _rs.length.to_s + ") to server=" + servers[_server_index] + " for " + servername) + # Assert we can scan region in its current location + isSuccessfulScan(admin, _rs[_counter]) + # Now move it. + move(admin, _rs[_counter], servers[_server_index], servername) + movedRegions.add(_rs[_counter]) + end + counter += 1 + server_index = (server_index + 1) % servers.length end + $LOG.info("Waiting for the pool to complete") + pool.stop + $LOG.info("Pool completed") end if movedRegions.size() > 0 # Write out file of regions moved @@ -394,7 +402,10 @@ count = 0 # sleep 20s to make sure the rs finished initialization. sleep 20 - for r in regions + counter = 0 + pool = ThreadPool.new(options[:maxthreads]) + while counter < regions.length do + r = regions[counter] exists = false begin isSuccessfulScan(admin, r) @@ -402,19 +413,22 @@ rescue org.apache.hadoop.hbase.NotServingRegionException => e $LOG.info("Failed scan of " + e.message) end - count = count + 1 next unless exists currentServer = getServerNameForRegion(admin, r) if currentServer and currentServer == servername $LOG.info("Region " + r.getRegionNameAsString() + " (" + count.to_s + - " of " + regions.length.to_s + ") already on target server=" + servername) + " of " + regions.length.to_s + ") already on target server=" + servername) next end - $LOG.info("Moving region " + r.getRegionNameAsString() + " (" + - count.to_s + " of " + regions.length.to_s + ") from server=" + - currentServer.to_s + " to server=" + servername.to_s); - move(admin, r, servername, currentServer) + pool.launch(r,currentServer,count) do |_r,_currentServer,_count| + $LOG.info("Moving region " + _r.getRegionNameAsString() + " (" + _count.to_s + + " of " + regions.length.to_s + ") from " + _currentServer + " to server=" + + servername); + move(admin, _r, servername, _currentServer) + end + counter = counter + 1 end + pool.stop end # Returns an array of hosts to exclude as region move targets @@ -455,6 +469,7 @@ opts.banner = "Usage: #{NAME}.rb [options] load|unload " opts.separator 'Load or unload regions by moving one at a time' options[:file] = nil + options[:maxthreads] = 1 opts.on('-f', '--filename=FILE', 'File to save regions list into unloading, or read from loading; default /tmp/') do |file| options[:file] = file end @@ -469,6 +484,9 @@ opts.on('-x', '--excludefile=FILE', 'File with hosts-per-line to exclude as unload targets; default excludes only target host; useful for rack decommisioning.') do |file| options[:excludesFile] = file end + opts.on('-m', '--maxthreads=XX', 'Define the maximum number of threads to use to unload and reload the regions') do |number| + options[:maxthreads] = number.to_i + end end optparse.parse! Index: bin/rolling-restart.sh =================================================================== --- bin/rolling-restart.sh (revision 1587710) +++ bin/rolling-restart.sh (working copy) @@ -34,7 +34,7 @@ # # Modelled after $HADOOP_HOME/bin/slaves.sh. -usage="Usage: $0 [--config ] [--rs-only] [--master-only] [--graceful]" +usage="Usage: $0 [--config ] [--rs-only] [--master-only] [--graceful] [--maxthreads xx]" bin=`dirname "$0"` bin=`cd "$bin">/dev/null; pwd` @@ -57,24 +57,33 @@ RR_RS=1 RR_MASTER=1 RR_GRACEFUL=0 +RR_MAXTHREADS=1 -for x in "$@" ; do - case "$x" in +while [ $# -gt 0 ]; do + case "$1" in --rs-only|-r) RR_RS=1 RR_MASTER=0 RR_GRACEFUL=0 + shift ;; --master-only) RR_RS=0 RR_MASTER=1 RR_GRACEFUL=0 + shift ;; --graceful) RR_RS=0 RR_MASTER=0 RR_GRACEFUL=1 + shift ;; + --maxthreads) + shift + RR_MAXTHREADS=$1 + shift + ;; *) echo Bad argument: $x usage @@ -158,7 +167,7 @@ rs_parts=(${rs//,/ }) hostname=${rs_parts[0]} echo "Gracefully restarting: $hostname" - "$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug "$hostname" + "$bin"/graceful_stop.sh --config "${HBASE_CONF_DIR}" --restart --reload --debug --maxthreads "${RR_MAXTHREADS}" "$hostname" sleep 1 done fi Index: bin/graceful_stop.sh =================================================================== --- bin/graceful_stop.sh (revision 1587710) +++ bin/graceful_stop.sh (working copy) @@ -23,13 +23,14 @@ # Move regions off a server then stop it. Optionally restart and reload. # Turn off the balancer before running this script. function usage { - echo "Usage: graceful_stop.sh [--config ] [--restart [--reload]] [--thrift] [--rest] " - echo " thrift If we should stop/start thrift before/after the hbase stop/start" - echo " rest If we should stop/start rest before/after the hbase stop/start" - echo " restart If we should restart after graceful stop" - echo " reload Move offloaded regions back on to the restarted server" - echo " debug Print helpful debug information" - echo " hostname Hostname of server we are to stop" + echo "Usage: graceful_stop.sh [--config ] [--restart [--reload]] [--thrift] [--rest] [--maxhtreads xx] " + echo " thrift If we should stop/start thrift before/after the hbase stop/start" + echo " rest If we should stop/start rest before/after the hbase stop/start" + echo " restart If we should restart after graceful stop" + echo " reload Move offloaded regions back on to the restarted server" + echo " debug Print helpful debug information" + echo " maxthreads xx Limit the number of threads used by the region mover. Default value is 1." + echo " hostname Hostname of server we are to stop" exit 1 } @@ -47,6 +48,7 @@ debug= thrift= rest= +maxthreads=1 while [ $# -gt 0 ] do case "$1" in @@ -55,6 +57,7 @@ --restart) restart=true; shift;; --reload) reload=true; shift;; --debug) debug="--debug"; shift;; + --maxthreads) shift; maxthreads=$1; shift;; --) shift; break;; -*) usage ;; *) break;; # terminate while loop @@ -73,7 +76,7 @@ HBASE_BALANCER_STATE=`echo 'balance_switch false' | "$bin"/hbase --config ${HBASE_CONF_DIR} shell | tail -3 | head -1` echo "Previous balancer state was $HBASE_BALANCER_STATE" echo "Unloading $hostname region(s)" -HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug unload $hostname +HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads unload $hostname echo "Unloaded $hostname region(s)" # Stop the server. Have to put hostname into its own little file for hbase-daemons.sh hosts="/tmp/$(basename $0).$$.tmp" @@ -96,7 +99,7 @@ fi if [ "$reload" != "" ]; then echo "Reloading $hostname region(s)" - HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug load $hostname + HBASE_NOEXEC=true "$bin"/hbase --config ${HBASE_CONF_DIR} org.jruby.Main "$bin"/region_mover.rb --file=$filename $debug --maxthreads=$maxthreads load $hostname echo "Reloaded $hostname region(s)" fi fi