commit 8d5ec3cd2b044b5317c7e6a0fed9815ca65bc096 Author: nileema Date: 28 hours ago Online schema changes diff --git a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index f151c77..d06c233 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -837,6 +837,28 @@ public class HBaseAdmin implements Abortable, Closeable { } /** + * Get the status of alter command - indicates how many regions have received + * the updated schema Asynchronous operation. + * + * @param tableName + * name of the table to get the status of + * @return List indicating the number of regions updated List.get(0) is the + * regions that are yet to be updated List.get(1) is the total number + * of regions of the table + * @throws IOException + * if a remote or network exception occurs + */ + public Pair getAlterStatus(final byte[] tableName) + throws IOException { + HTableDescriptor.isLegalTableName(tableName); + try { + return getMaster().getAlterStatus(tableName); + } catch (RemoteException e) { + throw e; + } + } + + /** * Add a column to an existing table. * Asynchronous operation. * diff --git a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java index 13c8b8c..ab9b8bf 100644 --- a/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java +++ b/src/main/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Text; @@ -209,6 +210,7 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur addToMap(HLogKey.class, code++); addToMap(List.class, code++); + addToMap(NavigableSet.class, code++); addToMap(ColumnPrefixFilter.class, code++); @@ -238,6 +240,7 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur addToMap(RegionOpeningState.class, code++); + addToMap(Pair.class, code++); } private Class declaredClass; @@ -337,6 +340,9 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur if (code == null ) { if ( List.class.isAssignableFrom(c)) { code = CLASS_TO_CODE.get(List.class); + } + else if (Pair.class.isAssignableFrom(c)) { + code = CLASS_TO_CODE.get(Pair.class); } else if (Writable.class.isAssignableFrom(c)) { code = CLASS_TO_CODE.get(Writable.class); @@ -425,7 +431,11 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur writeObject(out, list.get(i), list.get(i).getClass(), conf); } - } else if (declClass == String.class) { // String + } else if (Pair.class.isAssignableFrom(declClass)) { + Pair pair = (Pair) instanceObj; + writeObject(out, pair.getFirst(), pair.getFirst().getClass(), conf); + writeObject(out, pair.getSecond(), pair.getSecond().getClass(), conf); + } else if (declClass == String.class) { // String Text.writeString(out, (String)instanceObj); } else if (declClass.isPrimitive()) { // primitive type if (declClass == Boolean.TYPE) { // boolean @@ -448,6 +458,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur } else { throw new IllegalArgumentException("Not a primitive: "+declClass); } + } else if (declClass == Integer.class) { // Integer + out.writeInt(((Integer)instanceObj).intValue()); } else if (declClass.isEnum()) { // enum Text.writeString(out, ((Enum)instanceObj).name()); } else if (Writable.class.isAssignableFrom(declClass)) { // Writable @@ -538,6 +550,8 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur } else { throw new IllegalArgumentException("Not a primitive: "+declaredClass); } + } else if (declaredClass == Integer.class) { // Integer + instance = Integer.valueOf(in.readInt()); } else if (declaredClass.isArray()) { // array if (declaredClass.equals(byte [].class)) { instance = Bytes.readByteArray(in); @@ -556,6 +570,10 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur for (int i = 0; i < length; i++) { ((ArrayList)instance).add(readObject(in, conf)); } + } else if (Pair.class.isAssignableFrom(declaredClass)) { // Pair + instance = new Pair(); + ((Pair) instance).setFirst(readObject(in, conf)); + ((Pair) instance).setSecond(readObject(in, conf)); } else if (declaredClass == String.class) { // String instance = Text.readString(in); } else if (declaredClass.isEnum()) { // enum diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java index c0aa024..41c978f 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.ipc.VersionedProtocol; /** @@ -72,6 +73,19 @@ public interface HMasterInterface extends VersionedProtocol { public void deleteTable(final byte [] tableName) throws IOException; /** + * Used by the client to get the number of regions that have received the + * updated schema + * + * @param tableName + * @return List indicating the number of regions updated List.get(0) is the + * regions that are yet to be updated List.get(1) is the total number + * of regions of the table + * @throws IOException + */ + public Pair getAlterStatus(byte[] tableName) + throws IOException; + + /** * Adds a column to the specified table * @param tableName table to modify * @param column column descriptor diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 49d1e7c..691cde9 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -24,6 +24,7 @@ import java.io.DataOutput; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,6 +46,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; @@ -101,6 +103,12 @@ public class AssignmentManager extends ZooKeeperListener { private TimeoutMonitor timeoutMonitor; + /** + * Map of regions to reopen after the schema of a table is changed. Maps + * encoded region name to the HRegionInfo + */ + private final Map regionsToReopen; + /* * Maximum times we recurse an assignment. See below in {@link #assign()}. */ @@ -162,6 +170,8 @@ public class AssignmentManager extends ZooKeeperListener { this.serverManager = serverManager; this.catalogTracker = catalogTracker; this.executorService = service; + this.regionsToReopen = Collections.synchronizedMap + (new HashMap ()); Configuration conf = master.getConfiguration(); this.timeoutMonitor = new TimeoutMonitor( conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000), @@ -202,8 +212,60 @@ public class AssignmentManager extends ZooKeeperListener { // sharing. return this.zkTable; } + /** + * This function returns the RegionServer to which hri is assigned. + * + * @param hri + * HRegion for which this function returns the region server + * @return HServerInfo The region server to which hri belongs + */ + public ServerName getRegionServerOfRegion(HRegionInfo hri) { + return regions.get(hri); + } + + /** + * Add a regionPlan for the specified region. + */ + public void addPlan(String encodedName, RegionPlan plan) { + synchronized (regionPlans) { + regionPlans.put(encodedName, plan); + } + } + + /** + * Set the list of regions that will be because of an update in table schema + * + * @param regions + * list of regions that should be tracked for reopen + */ + public void setRegionsToReopen(List regions) { + for(HRegionInfo hri : regions) { + regionsToReopen.put(hri.getEncodedName(), hri); + } + } /** + * Used by the client to identify if all regions have the schema updates + * + * @param tableName + * @return Pair indicating the status of the alter command + * @throws IOException + */ + public Pair getReopenStatus(byte[] tableName) + throws IOException { + List hris = MetaReader.getTableRegions( + this.master.getCatalogTracker(), tableName); + LOG.info("======== in assignment manager"); + Integer pending = 0; + for(HRegionInfo hri : hris) { + if(regionsToReopen.get(hri.getEncodedName()) != null) { + pending++; + } + } + LOG.info("======== in assignment manager: returning " + pending); + return new Pair(pending, hris.size()); + } + /** * Reset all unassigned znodes. Called on startup of master. * Call {@link #assignAllUserRegions()} after root and meta have been assigned. * @throws IOException @@ -455,6 +517,10 @@ public class AssignmentManager extends ZooKeeperListener { final RegionState.State state, final RegionTransitionData oldData) { this.regionsInTransition.put(hri.getEncodedName(), new RegionState(hri, state, oldData.getStamp(), oldData.getOrigin())); + RegionState regionState = regionsInTransition.get(hri.getEncodedName()); + if (!regionsToReopen.isEmpty()) { + regionsToReopen.remove(regionState.getRegion().getEncodedName()); + } new ClosedRegionHandler(this.master, this, hri).process(); } @@ -593,6 +659,14 @@ public class AssignmentManager extends ZooKeeperListener { // what follows will fail because not in expected state. regionState.update(RegionState.State.CLOSED, data.getStamp(), data.getOrigin()); + if (!regionsToReopen.isEmpty()) { + HRegionInfo hri = regionsToReopen.remove(regionState.getRegion(). + getEncodedName()); + if (hri != null) { + LOG.info("Removed region from reopen list: " + + hri.getRegionNameAsString()); + } + } this.executorService.submit(new ClosedRegionHandler(master, this, regionState.getRegion())); break; @@ -1369,6 +1443,11 @@ public class AssignmentManager extends ZooKeeperListener { RegionPlan existingPlan = null; synchronized (this.regionPlans) { existingPlan = this.regionPlans.get(encodedName); + if (existingPlan != null && existingPlan.getDestination() != null) { + LOG.debug("Found an existing plan for " + + state.getRegion().getRegionNameAsString() + + " destination server is + " + existingPlan.getDestination().toString()); + } if (forceNewPlan || existingPlan == null || existingPlan.getDestination() == null || existingPlan.getDestination().equals(serverToExclude)) { @@ -1391,6 +1470,29 @@ public class AssignmentManager extends ZooKeeperListener { } /** + * Unasign the list of regions. Configuration knobs: + * hbase.bulk.reopen.waitafter.reopen indicates the number of milliseconds to + * wait before unassigning another region from this region server + * + * @param regions + * @throws InterruptedException + */ + public void unassign(List regions) throws InterruptedException { + int waitTime = this.master.getConfiguration().getInt( + "hbase.bulk.reopen.waitafter.reopen", 0); + for (HRegionInfo region : regions) { + if (isRegionInTransition(region) != null) + continue; + unassign(region, false); + while (isRegionInTransition(region) != null) { + Thread.sleep(10); + } + if (waitTime > 0) + Thread.sleep(waitTime); + } + } + + /** * Unassigns the specified region. *

* Updates the RegionState and sends the CLOSE RPC. diff --git a/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java b/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java new file mode 100644 index 0000000..6bd21f8 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java @@ -0,0 +1,105 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.AssignmentManager; +import org.apache.hadoop.hbase.master.BulkAssigner; +import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan; + +/** + * Performs bulk reopen of the list of regions provided to it. + */ +public class BulkReOpen extends BulkAssigner { + private final Map> rsToRegions; + private final AssignmentManager assignmentManager; + + public BulkReOpen(final Server server, + final Map> serverToRegions, + final AssignmentManager am) { + super(server); + this.assignmentManager = am; + this.rsToRegions = serverToRegions; + } + + /** + * Unassign all regions, so that they go through the regular region + * assignment flow (in assignment manager) and are re-opened. + */ + @Override + protected void populatePool(ExecutorService pool) { + for (Map.Entry> e : rsToRegions + .entrySet()) { + final List hris = e.getValue(); + // add a plan for each of the regions that needs to be reopened + for (HRegionInfo hri : hris) { + RegionPlan reOpenPlan = new RegionPlan(hri, null, + assignmentManager.getRegionServerOfRegion(hri)); + assignmentManager.addPlan(hri.getEncodedName(), reOpenPlan); + } + pool.execute(new Runnable() { + public void run() { + try { + assignmentManager.unassign(hris); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + } + } + + /** + * Reopen the regions asynchronously, so always returns true immediately. + * @return true + */ + @Override + protected boolean waitUntilDone(long timeout) { + return true; + } + + /** + * Configuration knobs "hbase.bulk.reopen.threadpool.size" number of regions + * that can be reopened concurrently. The maximum number of threads the master + * creates is never more than the number of region servers. + * If configuration is not defined it defaults to 20 + */ + protected int getThreadCount() { + int defaultThreadCount = super.getThreadCount(); + String threadCount = this.server.getConfiguration().get( + "hbase.bulk.reopen.threadpool.size"); + if (threadCount != null) + return this.server.getConfiguration().getInt( + "hbase.bulk.reopen.threadpool.size", defaultThreadCount); + else + return defaultThreadCount; + } + + public boolean bulkReOpen() throws InterruptedException { + return bulkAssign(); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 8beeb68..f22c727 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1053,7 +1053,19 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { cpHost.postDeleteTable(tableName); } } - + + /** + * Get the number of regions of the table that have been updated by the alter. + * + * @return List indicating the number of regions updated List.get(0) is the + * regions that are yet to be updated List.get(1) is the total number + * of regions of the table + */ + public Pair getAlterStatus(byte[] tableName) + throws IOException { + return this.assignmentManager.getReopenStatus(tableName); + } + public void addColumn(byte [] tableName, HColumnDescriptor column) throws IOException { if (cpHost != null) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 57c1140..aea8185 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -453,7 +453,7 @@ public class ServerManager { * @throws RetriesExhaustedException wrapping a ConnectException if failed * putting up proxy. */ - private HRegionInterface getServerConnection(final ServerName sn) + public HRegionInterface getServerConnection(final ServerName sn) throws IOException { HRegionInterface hri = this.serverConnections.get(sn.toString()); if (hri == null) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index 09891aa..b2843ec 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -20,18 +20,31 @@ package org.apache.hadoop.hbase.master.handler; import java.io.IOException; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.catalog.MetaReader; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventHandler.EventType; +import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; import org.apache.zookeeper.KeeperException; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + /** * Base class for performing operations against tables. * Checks on whether the process can go forward are done in constructor rather @@ -51,7 +64,15 @@ public abstract class TableEventHandler extends EventHandler { super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; - this.masterServices.checkTableModifiable(tableName); + try { + this.masterServices.checkTableModifiable(tableName); + } catch (TableNotDisabledException ex) { + if (eventType != EventType.C_M_DELETE_TABLE) + LOG.debug("Ignoring table not disabled exception " + + "for supporting online schema changes."); + else + throw ex; + } this.tableNameStr = Bytes.toString(this.tableName); } @@ -64,6 +85,15 @@ public abstract class TableEventHandler extends EventHandler { MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); handleTableOperation(hris); + if (eventType != EventType.C_M_DELETE_TABLE) { + this.masterServices.getAssignmentManager().setRegionsToReopen(hris); + if (reOpenAllRegions(hris)) { + LOG.info("Completed table operation " + eventType + " on table " + + Bytes.toString(tableName)); + } else { + LOG.warn("Error on reopening the regions"); + } + } } catch (IOException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } catch (KeeperException e) { @@ -71,6 +101,45 @@ public abstract class TableEventHandler extends EventHandler { } } + public boolean reOpenAllRegions(List regions) throws IOException { + boolean done = false; + + LOG.info("Bucketing regions by region server..."); + HTable table = new HTable(masterServices.getConfiguration(), tableName); + TreeMap> serverToRegions = Maps + .newTreeMap(); + NavigableMap hriHserverMapping = table.getRegionLocations(); + + for(HRegionInfo hri: regions) { + ServerName rsLocation = hriHserverMapping.get(hri); + if (!serverToRegions.containsKey(rsLocation)) { + LinkedList hriList = Lists.newLinkedList(); + serverToRegions.put(rsLocation, hriList); + } + serverToRegions.get(rsLocation).add(hri); + } + LOG.info("Reopening " + regions.size() + " regions on " + + serverToRegions.size() + " region servers."); + + BulkReOpen bulkReopen = new BulkReOpen(this.server, serverToRegions, + this.masterServices.getAssignmentManager()); + while (true) { + try { + if (bulkReopen.bulkReOpen()) { + done = true; + break; + } else { + LOG.warn("Timeout before reopening all regions"); + } + } catch (InterruptedException e) { + LOG.warn("Reopen was interrupted"); + // Preserve the interrupt. + Thread.currentThread().interrupt(); + break; + } + } + return done; + } protected abstract void handleTableOperation(List regions) throws IOException, KeeperException; } \ No newline at end of file diff --git a/src/main/ruby/hbase/admin.rb b/src/main/ruby/hbase/admin.rb index 4460d6e..d50cb0f 100644 --- a/src/main/ruby/hbase/admin.rb +++ b/src/main/ruby/hbase/admin.rb @@ -19,6 +19,7 @@ # include Java +java_import org.apache.hadoop.hbase.util.Pair # Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin @@ -265,16 +266,31 @@ module Hbase end #---------------------------------------------------------------------------------------------- - # Change table structure or table options - def alter(table_name, *args) + # Check the status of alter command (number of regions reopened) + def alter_status(table_name) # Table name should be a string raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String) # Table should exist raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name) - # Table should be disabled - raise(ArgumentError, "Table #{table_name} is enabled. Disable it first before altering.") if enabled?(table_name) + status = Pair.new() + begin + status = @admin.getAlterStatus(table_name.to_java_bytes) + puts "#{status.getSecond() - status.getFirst()}/#{status.getSecond()} regions updated." + sleep 1 + end while status != nil && status.getFirst() != 0 + puts "Done." + end + + #---------------------------------------------------------------------------------------------- + # Change table structure or table options + def alter(table_name, wait = true, *args) + # Table name should be a string + raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String) + + # Table should exist + raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name) # There should be at least one argument raise(ArgumentError, "There should be at least one argument but the table name") if args.empty? @@ -301,8 +317,16 @@ module Hbase # If column already exist, then try to alter it. Create otherwise. if htd.hasFamily(column_name.to_java_bytes) @admin.modifyColumn(table_name, column_name, descriptor) + if wait == true + puts "Updating all regions with the new schema..." + alter_status(table_name) + end else @admin.addColumn(table_name, descriptor) + if wait == true + puts "Updating all regions with the new schema..." + alter_status(table_name) + end end next end @@ -311,6 +335,10 @@ module Hbase if method == "delete" raise(ArgumentError, "NAME parameter missing for delete method") unless arg[NAME] @admin.deleteColumn(table_name, arg[NAME]) + if wait == true + puts "Updating all regions with the new schema..." + alter_status(table_name) + end next end @@ -321,6 +349,10 @@ module Hbase htd.setMemStoreFlushSize(JLong.valueOf(arg[MEMSTORE_FLUSHSIZE])) if arg[MEMSTORE_FLUSHSIZE] htd.setDeferredLogFlush(JBoolean.valueOf(arg[DEFERRED_LOG_FLUSH])) if arg[DEFERRED_LOG_FLUSH] @admin.modifyTable(table_name.to_java_bytes, htd) + if wait == true + puts "Updating all regions with the new schema..." + alter_status(table_name) + end next end diff --git a/src/main/ruby/shell.rb b/src/main/ruby/shell.rb index 1ec330f..0282b04 100644 --- a/src/main/ruby/shell.rb +++ b/src/main/ruby/shell.rb @@ -227,6 +227,8 @@ Shell.load_command_group( is_enabled exists list + alter_status + alter_async ] ) diff --git a/src/main/ruby/shell/commands/alter.rb b/src/main/ruby/shell/commands/alter.rb index 1dd43ad..ee3668d 100644 --- a/src/main/ruby/shell/commands/alter.rb +++ b/src/main/ruby/shell/commands/alter.rb @@ -56,7 +56,7 @@ EOF def command(table, *args) format_simple_command do - admin.alter(table, *args) + admin.alter(table, true, *args) end end end diff --git a/src/main/ruby/shell/commands/alter_async.rb b/src/main/ruby/shell/commands/alter_async.rb new file mode 100644 index 0000000..10e714c --- /dev/null +++ b/src/main/ruby/shell/commands/alter_async.rb @@ -0,0 +1,66 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class AlterAsync < Command + def help + return <<-EOF +Alter column family schema, does not wait for all regions to receive the +schema changes. Pass table name and a dictionary specifying new column +family schema. Dictionaries are described on the main help command output. +Dictionary must include name of column family to alter. For example, + +To change or add the 'f1' column family in table 't1' from defaults +to instead keep a maximum of 5 cell VERSIONS, do: + + hbase> alter_async 't1', NAME => 'f1', VERSIONS => 5 + +To delete the 'f1' column family in table 't1', do: + + hbase> alter_async 't1', NAME => 'f1', METHOD => 'delete' + +or a shorter version: + + hbase> alter_async 't1', 'delete' => 'f1' + +You can also change table-scope attributes like MAX_FILESIZE +MEMSTORE_FLUSHSIZE, READONLY, and DEFERRED_LOG_FLUSH. + +For example, to change the max size of a family to 128MB, do: + + hbase> alter 't1', METHOD => 'table_att', MAX_FILESIZE => '134217728' + +There could be more than one alteration in one command: + + hbase> alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'} + +To check if all the regions have been updated, use alter_status +EOF + end + + def command(table, *args) + format_simple_command do + admin.alter(table, false, *args) + end + end + end + end +end diff --git a/src/main/ruby/shell/commands/alter_status.rb b/src/main/ruby/shell/commands/alter_status.rb new file mode 100644 index 0000000..aee096f --- /dev/null +++ b/src/main/ruby/shell/commands/alter_status.rb @@ -0,0 +1,38 @@ +# +# Copyright 2010 The Apache Software Foundation +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class AlterStatus < Command + def help + return <<-EOF +Get the status of the alter command. Indicates the number of regions of the +table that have received the updated schema +Pass table name. + +hbase> alter_status 't1' +EOF + end + def command(table) + admin.alter_status(table) + end + end + end +end diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java b/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java index a32dc99..f0c7a5e 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java @@ -40,14 +40,17 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler.EventType; import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -210,11 +213,12 @@ public class TestAdmin { /** * Verify schema modification takes. * @throws IOException + * @throws InterruptedException */ - @Test - public void testChangeTableSchema() throws IOException { - final byte [] tableName = Bytes.toBytes("changeTableSchema"); + @Test public void testOnlineChangeTableSchema() throws IOException, InterruptedException { + final byte [] tableName = Bytes.toBytes("changeTableSchemaOnline"); HTableDescriptor [] tables = admin.listTables(); + MasterServices masterServices = TEST_UTIL.getMiniHBaseCluster().getMaster(); int numTables = tables.length; TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY); tables = this.admin.listTables(); @@ -233,26 +237,20 @@ public class TestAdmin { copy.setValue(key, key); boolean expectedException = false; try { - this.admin.modifyTable(tableName, copy); + modifyTable(tableName, copy); } catch (TableNotDisabledException re) { expectedException = true; } - assertTrue(expectedException); - this.admin.disableTable(tableName); - assertTrue(this.admin.isTableDisabled(tableName)); - modifyTable(tableName, copy); + assertFalse(expectedException); HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(tableName); // Assert returned modifiedhcd is same as the copy. assertFalse(htd.equals(modifiedHtd)); assertTrue(copy.equals(modifiedHtd)); assertEquals(newFlushSize, modifiedHtd.getMemStoreFlushSize()); assertEquals(key, modifiedHtd.getValue(key)); - - // Reenable table to test it fails if not disabled. - this.admin.enableTable(tableName); - assertFalse(this.admin.isTableDisabled(tableName)); - + // Now work on column family changes. + htd = this.admin.getTableDescriptor(tableName); int countOfFamilies = modifiedHtd.getFamilies().size(); assertTrue(countOfFamilies > 0); HColumnDescriptor hcd = modifiedHtd.getFamilies().iterator().next(); @@ -266,36 +264,31 @@ public class TestAdmin { } catch (TableNotDisabledException re) { expectedException = true; } - assertTrue(expectedException); - this.admin.disableTable(tableName); - assertTrue(this.admin.isTableDisabled(tableName)); - // Modify Column is synchronous - this.admin.modifyColumn(tableName, hcd); + assertFalse(expectedException); modifiedHtd = this.admin.getTableDescriptor(tableName); HColumnDescriptor modifiedHcd = modifiedHtd.getFamily(hcdName); assertEquals(newMaxVersions, modifiedHcd.getMaxVersions()); // Try adding a column - // Reenable table to test it fails if not disabled. this.admin.enableTable(tableName); assertFalse(this.admin.isTableDisabled(tableName)); final String xtracolName = "xtracol"; + htd = this.admin.getTableDescriptor(tableName); HColumnDescriptor xtracol = new HColumnDescriptor(xtracolName); xtracol.setValue(xtracolName, xtracolName); + expectedException = false; try { this.admin.addColumn(tableName, xtracol); } catch (TableNotDisabledException re) { expectedException = true; } - assertTrue(expectedException); - this.admin.disableTable(tableName); - assertTrue(this.admin.isTableDisabled(tableName)); - this.admin.addColumn(tableName, xtracol); + // Add column should work even if the table is enabled + assertFalse(expectedException); modifiedHtd = this.admin.getTableDescriptor(tableName); hcd = modifiedHtd.getFamily(xtracol.getName()); assertTrue(hcd != null); assertTrue(hcd.getValue(xtracolName).equals(xtracolName)); - + // Delete the just-added column. this.admin.deleteColumn(tableName, xtracol.getName()); modifiedHtd = this.admin.getTableDescriptor(tableName); @@ -303,11 +296,12 @@ public class TestAdmin { assertTrue(hcd == null); // Delete the table + this.admin.disableTable(tableName); this.admin.deleteTable(tableName); this.admin.listTables(); assertFalse(this.admin.tableExists(tableName)); - } - + } + /** * Modify table is async so wait on completion of the table operation in master. * @param tableName