commit 20e98690d55eae55deac9389c6bb5b2a543c8c8e Author: Yu Li Date: Tue Dec 27 16:21:41 2016 +0800 This is a combination of 9 commits. * Add support for embedded mode * Refine retry codes in DirectDBOperator * Support mutate in embedded hbase - Including delete, append and increment, left a bug to fix that delete not working after parallel append/increment with more than 5 threads * Add support of Scan for EmbeddedHBase * Support snapshot, deleteSnapshot and restoreSnapshot in embedded hbase * remove unused import && repeat setMetricsSystem * Avoid starting rpc server in embedded mode * Code refactor: cleanup unnecessary interface, add javadocs and rename classes * Refactor embedded scanner diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 2bb2091..51eaa35 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -262,7 +262,7 @@ public final class ConnectionUtils { /** * Create a row before the specified row and very close to the specified row. */ - static byte[] createCloseRowBefore(byte[] row) { + public static byte[] createCloseRowBefore(byte[] row) { if (row.length == 0) { return MAX_BYTE_ARRAY; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 0c383fc..512a8b6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1329,4 +1329,4 @@ public class HTable implements Table { } return mutator; } -} \ No newline at end of file +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 609e9a5..72394f0 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1353,6 +1353,9 @@ public final class HConstants { public static final String DEFAULT_TEMPORARY_HDFS_DIRECTORY = "/user/" + System.getProperty("user.name") + "/hbase-staging"; + public static String EMBEDDED_MODE = "hbase.embedded"; + public static String EMBEDDED_DB_OPERATOR_CLASS = "hbase.embedded.operator.class"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/DirectOperator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/DirectOperator.java new file mode 100644 index 0000000..5187b67 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/DirectOperator.java @@ -0,0 +1,193 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Threads; + +/** + * Operator supplying direct access to region + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DirectOperator extends Operator { + private final Log LOG = LogFactory.getLog(DirectOperator.class); + final EmbeddedRpcServices rpcServices; + final TableName tableName; + final int maxRetries; + final long pause; + + public DirectOperator(EmbeddedHBase eHBase, Table table) { + super(eHBase, table); + this.rpcServices = (EmbeddedRpcServices) eHBase.getRSRpcServices(); + this.tableName = table.getName(); + this.maxRetries = eHBase.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + this.pause = eHBase.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + } + + @Override + public Result get(Get get) throws IOException { + int tries = 0; + List exceptions = null; + while (true) { + try { + return rpcServices.get(tableName, get); + } catch (IOException e) { + checkRetry(tries, e, exceptions); + Threads.sleep(ConnectionUtils.getPauseTime(pause, tries)); + tries++; + } + } + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + int tries = 0; + List exceptions = null; + while (true) { + try { + return rpcServices.getScanner(tableName, scan); + } catch (IOException e) { + checkRetry(tries, e, exceptions); + Threads.sleep(ConnectionUtils.getPauseTime(pause, tries)); + tries++; + } + } + } + + @Override + public void put(Put put) throws IOException { + int tries = 0; + List exceptions = null; + while (true) { + try { + rpcServices.put(tableName, put); + break; + } catch (IOException e) { + checkRetry(tries, e, exceptions); + Threads.sleep(ConnectionUtils.getPauseTime(pause, tries)); + tries++; + } + } + } + + private void checkRetry(int tries, IOException e, List exceptions) + throws IOException { + if (exceptions == null) { + exceptions = new ArrayList(); + } + exceptions.add(new ThrowableWithExtraContext(e, System.currentTimeMillis(), "tries=" + tries)); + if (tries >= maxRetries) { + throw new RetriesExhaustedException(tries, exceptions); + } else { + LOG.debug("Caught exception, tries=" + tries, e); + } + } + + @Override + public void delete(Delete delete) throws IOException { + int tries = 0; + List exceptions = null; + while (true) { + try { + rpcServices.delete(tableName, delete); + break; + } catch (IOException e) { + checkRetry(tries, e, exceptions); + Threads.sleep(ConnectionUtils.getPauseTime(pause, tries)); + tries++; + } + } + } + + @Override + public Result append(Append append) throws IOException { + int tries = 0; + List exceptions = null; + while (true) { + try { + return rpcServices.append(tableName, append); + } catch (IOException e) { + checkRetry(tries, e, exceptions); + Threads.sleep(ConnectionUtils.getPauseTime(pause, tries)); + tries++; + } + } + } + + @Override + public Result increment(Increment increment) throws IOException { + int tries = 0; + List exceptions = null; + while (true) { + try { + return rpcServices.increment(tableName, increment); + } catch (IOException e) { + checkRetry(tries, e, exceptions); + Threads.sleep(ConnectionUtils.getPauseTime(pause, tries)); + tries++; + } + } + } + + @Override + public void snapshot(String snapshotName) throws IOException { + rpcServices.snapshot(snapshotName, tableName); + } + + @Override + public void snapshot(String snapshotName, SnapshotType type) throws IOException { + rpcServices.snapshot(snapshotName, tableName); + } + + @Override + public void deleteSnapshot(String snapshotName) throws IOException { + rpcServices.deleteSnapshot(snapshotName); + } + + @Override + public void restoreSnapshot(String snapshotName, String tableName) throws IOException { + rpcServices.restoreSnapshot(snapshotName, tableName); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedHBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedHBase.java new file mode 100644 index 0000000..032658e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedHBase.java @@ -0,0 +1,412 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import java.io.File; +import java.io.IOException; +import java.net.UnknownHostException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.regionserver.ConstantFamilySizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.LastSequenceId; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DNS; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.RegionGroupingProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.zookeeper.KeeperException; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Utility class to use HBase in embedded mode + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class EmbeddedHBase extends HMaster + implements RegionServerServices, LastSequenceId, ConfigurationObserver { + private static final Log LOG = LogFactory.getLog(EmbeddedHBase.class); + private static MiniZooKeeperCluster zkCluster = null; + + // operator to handle requests, currently support both Table and direct access + private Operator operator; + + public EmbeddedHBase(Configuration conf, CoordinatedStateManager csm) + throws IOException, InterruptedException, KeeperException { + super(conf, csm); + } + + public static EmbeddedHBase open(final Options opts, final String path) + throws IOException, InterruptedException, KeeperException { + Configuration conf = HBaseConfiguration.create(); + return open(opts, path, conf); + } + + @Deprecated + public static EmbeddedHBase open(final TableName table, final String path) + throws IOException, InterruptedException, KeeperException { + Options opts = new Options(); + opts.setTableName(table); + return open(opts, path); + } + + @VisibleForTesting + public static EmbeddedHBase open(final Options opts, final String path, final Configuration conf) + throws IOException, InterruptedException, KeeperException { + doNecessaryConfig(conf, path); + + // FIXME now we're still using the zookeeper cluster and assignment manager for simple embeeded + // mode, later we should remove all distributed-related stuff + + // initialize mini-zookeeper cluster + startMiniZookeeperCluster(conf); + + // Get the instance + CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); + EmbeddedHBase eHBase = new EmbeddedHBase(conf, csm); + eHBase.start(); + waitUntilStarted(eHBase); + eHBase.setDBOprator(createTableIfNotExist(eHBase, opts)); + return eHBase; + } + + /** + * Necessary hard-coded configurations like using embedded mode, disable replication, etc. + * @param conf The configuration instance + * @param path The given path for the embedded database + * @throws UnknownHostException if failed to get host name + */ + private static void doNecessaryConfig(Configuration conf, String path) + throws UnknownHostException { + // set path and enable embedded mode + conf.set(HConstants.HBASE_DIR, path); + conf.setBoolean(HConstants.EMBEDDED_MODE, true); + + // get and set hostname + String hostName = Strings.domainNamePointerToHostName(DNS.getDefaultHost("default", "default")); + conf.set(MASTER_HOSTNAME_KEY, hostName); + conf.set(RS_HOSTNAME_KEY, hostName); + + // set path for the embedded zookeeper cluster + conf.set(HConstants.ZOOKEEPER_DATA_DIR, path + File.separator + "zookeeper"); + + // set no table on master so we won't require 2 RS + conf.set(BaseLoadBalancer.TABLES_ON_MASTER, "none"); + + // set server handler count although no rpc server + // since FSHLog sync queue length is based on this setting + conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 500); + + // increase blocking store files number to some higher value + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 20); + + // ConstantFamilySizeRegionSplitPolicy is necessary to disable split + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + ConstantFamilySizeRegionSplitPolicy.class.getName()); + + // use one WAL per region to increase writing throughput + conf.set(WALFactory.WAL_PROVIDER, "multiwal"); + conf.set(RegionGroupingProvider.REGION_GROUPING_STRATEGY, "identity"); + + // enable snapshot + conf.setBoolean(SnapshotManager.HBASE_SNAPSHOT_ENABLED, true); + } + + private static Table createTableIfNotExist(EmbeddedHBase eHBase, Options opts) + throws IOException { + TableName tableName = opts.getTableName(); + if (tableName == null) { + throw new IllegalArgumentException("Table name not set"); + } + ClusterConnection conn = eHBase.getClusterConnection(); + Admin admin = conn.getAdmin(); + if (!admin.tableExists(tableName)) { + HTableDescriptor tableDesc = new HTableDescriptor(tableName); + tableDesc.setDurability(opts.getDurability()); + addFamilies(tableDesc, opts); + tableDesc.setMaxFileSize(Long.MAX_VALUE); + byte[][] splitKeys = opts.getSplitKeys(); + eHBase.createTable(tableDesc, splitKeys, conn.getNonceGenerator().getNonceGroup(), + conn.getNonceGenerator().newNonce()); + } + while (!admin.isTableAvailable(tableName)) { + Threads.sleep(1000); + } + return conn.getTable(tableName); + } + + private static void addFamilies(HTableDescriptor tableDesc, Options opts) { + if (opts.getFamilies() == null) { + HColumnDescriptor family = new HColumnDescriptor(Options.DEFAULT_FAMILY); + setFamily(family, opts); + tableDesc.addFamily(family); + return; + } + for (byte[] familyName : opts.getFamilies()) { + HColumnDescriptor family = new HColumnDescriptor(familyName); + setFamily(family, opts); + tableDesc.addFamily(family); + } + } + + private static void setFamily(HColumnDescriptor family, Options opts) { + family.setCompressionType(opts.getCompression()); + family.setBlocksize(opts.getBlockSize()); + } + + static byte[][] getSplitKeys(int splits) { + byte[][] splitKeys = new byte[splits][]; + for (int i = 0; i < splits; i++) { + String key = "user" + (1000 + (i + 1) * (9999 - 1000) / splits); + LOG.trace("key of split " + i + ": " + key); + splitKeys[i] = Bytes.toBytes(key); + } + return splitKeys; + } + + private static void waitUntilStarted(EmbeddedHBase eHBase) { + while (!eHBase.isInitialized()) { + Threads.sleep(1000); + } + } + + private void setDBOprator(Table table) { + String operatorClass = + conf.get(HConstants.EMBEDDED_DB_OPERATOR_CLASS, DirectOperator.class.getName()); + try { + this.operator = + (Operator) ReflectionUtils.newInstance(Class.forName(operatorClass), this, table); + LOG.debug("Operator: " + operator.getClass().getName()); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + private static void startMiniZookeeperCluster(Configuration conf) + throws IOException, InterruptedException { + final MiniZooKeeperCluster zooKeeperCluster = new MiniZooKeeperCluster(conf); + File zkDataPath = new File(conf.get(HConstants.ZOOKEEPER_DATA_DIR)); + LOG.debug("zookeeper path: "+zkDataPath.getAbsolutePath()); + + // find out the default client port + int zkClientPort = 0; + + // If the zookeeper client port is specified in server quorum, use it. + String zkserver = conf.get(HConstants.ZOOKEEPER_QUORUM); + if (zkserver != null) { + String[] zkservers = zkserver.split(","); + + if (zkservers.length > 1) { + // In embedded mode we only support one zookeeper server. + String errorMsg = + "Could not start ZK with " + zkservers.length + " ZK servers in embedded mode."; + LOG.error(errorMsg); + throw new IOException(errorMsg); + } + + String[] parts = zkservers[0].split(":"); + + if (parts.length == 2) { + // the second part is the client port + zkClientPort = Integer.parseInt(parts[1]); + } + } + // If the client port could not be find in server quorum conf, try another conf + if (zkClientPort == 0) { + zkClientPort = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 0); + // The client port has to be set by now; if not, throw exception. + if (zkClientPort == 0) { + throw new IOException("No config value for " + HConstants.ZOOKEEPER_CLIENT_PORT); + } + } + zooKeeperCluster.setDefaultClientPort(zkClientPort); + // set the ZK tick time if specified + int zkTickTime = conf.getInt(HConstants.ZOOKEEPER_TICK_TIME, 0); + if (zkTickTime > 0) { + zooKeeperCluster.setTickTime(zkTickTime); + } + + // login the zookeeper server principal (if using security) + ZKUtil.loginServer(conf, "hbase.zookeeper.server.keytab.file", + "hbase.zookeeper.server.kerberos.principal", null); + int localZKClusterSessionTimeout = + conf.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", 10 * 1000); + conf.setInt(HConstants.ZK_SESSION_TIMEOUT, localZKClusterSessionTimeout); + LOG.info("Starting a zookeeper cluster"); + int clientPort = zooKeeperCluster.startup(zkDataPath); + if (clientPort != zkClientPort) { + String msg = "Could not start ZK at requested port of " + zkClientPort + + ". Instead ZK is started at port: " + clientPort; + LOG.warn(msg); + } + conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); + zkCluster = zooKeeperCluster; + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new EmbeddedRpcServices(this); + } + + public void put(final Put put) throws IOException { + operator.put(put); + } + + public Result get(final Get get) throws IOException { + return operator.get(get); + } + + public Result append(final Append append) throws IOException { + return operator.append(append); + } + + public Result increment(final Increment increment) throws IOException { + return operator.increment(increment); + } + + public void delete(final Delete delete) throws IOException { + operator.delete(delete); + } + + public ResultScanner getResultScanner(final Scan scan) throws IOException { + return operator.getScanner(scan); + } + + public void snapshot(final String snapshotName) throws IOException { + operator.snapshot(snapshotName); + } + + public void snapshot(String snapshotName, SnapshotType type) throws IOException { + operator.snapshot(snapshotName, type); + } + + public void deleteSnapshot(final String snapshotName) throws IOException { + operator.deleteSnapshot(snapshotName); + } + + public void restoreSnapshot(final String snapshotName) throws IOException { + TableName table = operator.getTableName(); + // disable table first before restore + Admin admin = getConnection().getAdmin(); + admin.disableTable(table); + operator.restoreSnapshot(snapshotName, table.getNameAsString()); + // enable table after restore + admin.enableTable(table); + while (!admin.isTableAvailable(table)) { + Threads.sleep(1000); + } + } + + public void close() throws IOException { + this.shutdown(); + waitUntilStopped(); + if (zkCluster != null) { + zkCluster.shutdown(); + } + } + + private void waitUntilStopped() { + while (!isShutDown()) { + Threads.sleep(1000); + } + } + + public static void main(String[] args) { + TableName table = TableName.valueOf("test"); + EmbeddedHBase eHBase = null; + try { + eHBase = EmbeddedHBase.open(table, "/tmp/embeddedhbase"); + } catch (Exception e) { + LOG.fatal("Failed to open database", e); + throw new RuntimeException(e); + } + LOG.debug("Server started"); + final byte[] row = Bytes.toBytes("row"); + final byte[] family = Bytes.toBytes("cf"); + final byte[] column = Bytes.toBytes("c1"); + final byte[] value = Bytes.toBytes("value"); + Put put = new Put(row); + put.addColumn(family, column, value); + try { + eHBase.put(put); + } catch (IOException e) { + LOG.error("Failed to put into database", e); + try { + eHBase.close(); + } catch (IOException ex) { + } + throw new RuntimeException(e); + } + Get get = new Get(row); + Result result = null; + try { + result = eHBase.get(get); + LOG.debug("Result: " + Bytes.toString(result.getValue(family, column))); + } catch (IOException e) { + LOG.error("Failed to get data from database", e); + try { + eHBase.close(); + } catch (IOException ex) { + } + throw new RuntimeException(e); + } + LOG.debug("Stopping server"); + if(eHBase!=null){ + try { + eHBase.close(); + } catch (IOException e) { + LOG.warn("Failed to stop server", e); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedReverseScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedReverseScanner.java new file mode 100644 index 0000000..d366501 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedReverseScanner.java @@ -0,0 +1,145 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.client.ReversedScannerCallable; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class EmbeddedReverseScanner extends EmbeddedSimpleScanner { + private static final Log LOG = LogFactory.getLog(EmbeddedReverseScanner.class); + + public EmbeddedReverseScanner(Configuration conf, Scan scan, TableName tableName, + HRegionServer regionServer) throws IOException { + super(conf, scan, tableName, regionServer); + } + + @Override + protected boolean nextScanner() throws IOException { + // Where to start the next scanner + byte[] localStartKey; + boolean locateTheClosestFrontRow = true; + if (this.currentRegion != null) { + byte[] startKey = this.currentRegion.getRegionInfo().getStartKey(); + if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(startKey)) { + close(); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); + } + return false; + } + localStartKey = startKey; + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + if (!Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY)) { + locateTheClosestFrontRow = false; + } + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.trace( + "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); + } + + try { + byte[] locateStartRow = + locateTheClosestFrontRow ? ConnectionUtils.createCloseRowBefore(localStartKey) : null; + if (locateStartRow == null) { + // Just locate the region with the row + this.currentRegion = (HRegion) getRegion(tableName, localStartKey); + if (this.currentRegion == null) { + throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + + Bytes.toStringBinary(localStartKey)); + } + scan.withStartRow(localStartKey); + } else { + List locatedRegions = locateRegionsInRange(locateStartRow, localStartKey); + if (locatedRegions.isEmpty()) { + throw new DoNotRetryIOException( + "Does hbase:meta exist hole? Couldn't get regions for the range from " + + Bytes.toStringBinary(locateStartRow) + " to " + + Bytes.toStringBinary(localStartKey)); + } + this.currentRegion = (HRegion) locatedRegions.get(locatedRegions.size() - 1); + scan.withStartRow(locateStartRow); + } + + this.regionScanner = this.currentRegion.getScanner(scan); + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + /** + * Get the corresponding regions for an arbitrary range of keys, copied from + * {@link ReversedScannerCallable} + * @param startKey Starting row in range, inclusive + * @param endKey Ending row in range, exclusive + * @return A list of HRegionLocation corresponding to the regions that contain the specified range + * @throws IOException + */ + private List locateRegionsInRange(byte[] startKey, byte[] endKey) throws IOException { + final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); + if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { + throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(startKey) + " > " + + Bytes.toStringBinary(endKey)); + } + List regionList = new ArrayList(); + byte[] currentKey = startKey; + do { + Region region = getRegion(tableName, currentKey); + if (region != null && region.getRegionInfo().containsRow(currentKey)) { + regionList.add(region); + } else { + throw new DoNotRetryIOException( + "Does hbase:meta exist hole? Locating row " + Bytes.toStringBinary(currentKey) + + " returns incorrect region " + (region == null ? null : region.getRegionInfo())); + } + currentKey = region.getRegionInfo().getEndKey(); + } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) + && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0)); + return regionList; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedRpcServices.java new file mode 100644 index 0000000..7bf6d22 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedRpcServices.java @@ -0,0 +1,218 @@ +/** +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.hbase.embedded; + +import com.google.common.base.Preconditions; +import java.io.IOException; +import java.io.InterruptedIOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotDescription; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; +import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class EmbeddedRpcServices extends MasterRpcServices { + private static final Log LOG = LogFactory.getLog(EmbeddedRpcServices.class); + private final HMaster server; + private final long pause; + private final int numRetries; + + public EmbeddedRpcServices(HMaster server) throws IOException { + super(server); + this.server = server; + this.pause = server.getConfiguration().getLong(HConstants.HBASE_CLIENT_PAUSE, + HConstants.DEFAULT_HBASE_CLIENT_PAUSE); + this.numRetries = server.getConfiguration().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + } + + @Override + protected void start() { + // do nothing here + } + + @Override + protected void stop() { + closeAllScanners(); + } + + public Result get(TableName tableName, Get get) throws IOException { + checkOpen(); + Region region = getRegion(tableName, get.getRow()); + return get(get, ((HRegion) region)); + } + + private Result get(Get get, HRegion region) throws IOException { + return region.get(get); + } + + public void put(TableName tableName, Put put) throws IOException { + getRegion(tableName, put).put(put); + } + + public void delete(TableName tableName, Delete delete) throws IOException { + getRegion(tableName, delete).delete(delete); + } + + public Result append(TableName tableName, Append append) throws IOException { + // don't need nonce in embedded mode, see #HBASE-3787 about nonce + return getRegion(tableName, append).append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + public Result increment(TableName tableName, Increment increment) throws IOException { + // don't need nonce in embedded mode, see #HBASE-3787 about nonce + return getRegion(tableName, increment).increment(increment, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + + /** + * Get region for mutation, and reclaim memstore memory if necessary + * @param tableName the name of the table + * @param mutate the mutation + * @return the region to operate + * @throws IOException if any error occurs + */ + private Region getRegion(TableName tableName, Mutation mutate) throws IOException { + checkOpen(); + Region region = getRegion(tableName, mutate.getRow()); + if (!region.getRegionInfo().isMetaTable()) { + reclaimMemStoreMemory(); + } + return region; + } + + public ResultScanner getScanner(TableName tableName, Scan scan) throws IOException { + Preconditions.checkNotNull(tableName); + Preconditions.checkNotNull(scan); + + checkOpen(); + + if (!scan.isReversed()) { + return new EmbeddedSimpleScanner(getConfiguration(), scan, tableName, server); + } else { + return new EmbeddedReverseScanner(getConfiguration(), scan, tableName, server); + } + } + + @Override + public ScanResponse scan(final RpcController controller, final ScanRequest request) + throws ServiceException { + return super.scan(controller, request); + } + + private Region getRegion(TableName tableName, byte[] row) throws IOException { + byte[] regionName = + server.getClusterConnection().locateRegion(tableName, row).getRegionInfo().getRegionName(); + return server.getRegionByEncodedName(HRegionInfo.encodeRegionName(regionName)); + } + + public void snapshot(final String snapshotName, final TableName tableName) throws IOException { + snapshot(snapshotName, tableName, SnapshotType.FLUSH); + } + + public void snapshot(final String snapshotName, final TableName tableName, SnapshotType type) + throws IOException { + SnapshotDescription snapshotDesc = new SnapshotDescription(snapshotName, tableName, type); + HBaseProtos.SnapshotDescription snapshot = + ProtobufUtil.createHBaseProtosSnapshotDesc(snapshotDesc); + snapshot = SnapshotDescriptionUtils.validate(snapshot, getConfiguration()); + SnapshotManager snapshotManager = server.getSnapshotManager(); + LOG.info("Snapshot request for:" + ClientSnapshotDescriptionUtils.toString(snapshot)); + snapshotManager.takeSnapshot(snapshot); + long max = SnapshotDescriptionUtils.getMaxMasterTimeout(getConfiguration(), snapshot.getType(), + SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME); + long start = EnvironmentEdgeManager.currentTime(); + long maxPauseTime = max / this.numRetries; + int tries = 0; + boolean done = false; + while (tries == 0 || ((EnvironmentEdgeManager.currentTime() - start) < max && !done)) { + try { + // sleep a backoff <= pauseTime amount + long sleep = getPauseTime(tries++); + sleep = sleep > maxPauseTime ? maxPauseTime : sleep; + LOG.debug( + "(#" + tries + ") Sleeping: " + sleep + "ms while waiting for snapshot completion."); + Thread.sleep(sleep); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException("Interrupted").initCause(e); + } + LOG.debug("Getting current status of snapshot from master..."); + done = snapshotManager.isSnapshotDone(snapshot); + } + if (!done) { + throw new SnapshotCreationException( + "Snapshot '" + snapshot.getName() + "' wasn't completed in expectedTime:" + max + " ms", + snapshotDesc); + } + } + + private long getPauseTime(int tries) { + int triesCount = tries; + if (triesCount >= HConstants.RETRY_BACKOFF.length) { + triesCount = HConstants.RETRY_BACKOFF.length - 1; + } + return this.pause * HConstants.RETRY_BACKOFF[triesCount]; + } + + public void deleteSnapshot(final String snapshotName) throws IOException { + HBaseProtos.SnapshotDescription snapshot = + HBaseProtos.SnapshotDescription.newBuilder().setName(snapshotName).build(); + LOG.info("Delete snapshot: " + snapshot.getName()); + server.getSnapshotManager().deleteSnapshot(snapshot); + } + + public void restoreSnapshot(final String snapshotName, final String tableName) + throws IOException { + final HBaseProtos.SnapshotDescription snapshot = HBaseProtos.SnapshotDescription.newBuilder() + .setName(snapshotName).setTable(tableName).build(); + LOG.info("Restore snapshot: " + snapshot.getName()); + server.getSnapshotManager().restoreOrCloneSnapshot(snapshot, null); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedSimpleScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedSimpleScanner.java new file mode 100644 index 0000000..ec2f2cc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/EmbeddedSimpleScanner.java @@ -0,0 +1,198 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.AbstractClientScanner; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class EmbeddedSimpleScanner extends AbstractClientScanner { + private static final Log LOG = LogFactory.getLog(EmbeddedSimpleScanner.class); + + protected Configuration conf; + protected Scan scan; + protected final TableName tableName; + protected HRegion currentRegion; + protected RegionScanner regionScanner; + protected HRegionServer regionServer; + + protected boolean hasMoreResults = true; + + protected volatile boolean closed = false; + + public EmbeddedSimpleScanner(Configuration conf, Scan scan, TableName tableName, + HRegionServer regionServer) throws IOException { + this.conf = conf; + this.scan = scan; + this.tableName = tableName; + this.regionServer = regionServer; + initializeScannerInConstruction(); + } + + protected void initializeScannerInConstruction() throws IOException { + nextScanner(); + } + + /** + * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the + * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no + * further, just tidy up outstanding scanners, if currentRegion != null and + * done is true. + */ + protected boolean nextScanner() throws IOException { + // Where to start the next scanner + byte[] localStartKey; + if (this.currentRegion != null) { + byte[] endKey = this.currentRegion.getRegionInfo().getEndKey(); + if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(endKey)) { + close(); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); + } + return false; + } + localStartKey = endKey; + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.trace( + "Advancing internal scanner to startKey at '" + Bytes.toStringBinary(localStartKey) + "'"); + } + + try { + this.currentRegion = (HRegion) getRegion(tableName, localStartKey); + scan.withStartRow(localStartKey); + this.regionScanner = this.currentRegion.getScanner(scan); + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + @Override + public Result next() throws IOException { + if (closed) { + return null; + } + + if (!hasMoreResults && !nextScanner()) { + return null; + } + + Result result = null; + + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + ScannerContext scannerContext = contextBuilder.build(); + + // Collect values to be returned here + List values = new ArrayList(32); + + currentRegion.startRegionOperation(Region.Operation.SCAN); + try { + hasMoreResults = regionScanner.nextRaw(values, scannerContext); + if (!values.isEmpty()) { + result = Result.create(values, null, false, false); + } + } finally { + currentRegion.closeRegionOperation(Region.Operation.SCAN); + } + + return result; + } + + @Override + public void close() { + if (closed) { + return; + } + + try { + if (regionScanner != null) { + regionScanner.close(); + regionScanner = null; + } + } catch (IOException e) { + LOG.warn("scanner failed to close.", e); + } + closed = true; + } + + @Override + public boolean renewLease() { + return false; + } + + protected boolean checkScanStopRow(final byte[] endKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte[] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, endKey, 0, endKey.length); + if (scan.isReversed()) { + if (cmp >= 0) { + // stopRow >= startKey (stopRow is equals to or larger than endKey) + // This is a stop. + return true; + } + } else { + if (cmp <= 0) { + // stopRow <= endKey (endKey is equals to or larger than stopRow) + // This is a stop. + return true; + } + } + } + return false; // unlikely. + } + + protected Region getRegion(TableName tableName, byte[] row) throws IOException { + byte[] regionName = regionServer.getClusterConnection().locateRegion(tableName, row) + .getRegionInfo().getRegionName(); + return regionServer.getRegionByEncodedName(HRegionInfo.encodeRegionName(regionName)); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/Operator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/Operator.java new file mode 100644 index 0000000..ae81fc6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/Operator.java @@ -0,0 +1,215 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; + +/** + * Operator for requests + * @since 2.0.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class Operator { + final EmbeddedHBase eHBase; + final Table table; + + public Operator(EmbeddedHBase eHBase, Table table) { + this.eHBase = eHBase; + this.table = table; + } + + public TableName getTableName() { + return table.getName(); + } + + /** + * Extracts certain cells from a given row. + * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. + * @throws IOException if any exception occurs. + */ + public abstract Result get(Get get) throws IOException; + + /** + * Returns a scanner on the current table as specified by the {@link Scan} object. + * @param scan A configured {@link Scan} object. + * @return A scanner. + * @throws IOException if any exception occurs. + */ + public abstract ResultScanner getScanner(Scan scan) throws IOException; + + /** + * Puts some data in the table. + * @param put The data to put. + * @throws IOException if any exception occurs. + */ + public abstract void put(Put put) throws IOException; + + /** + * Deletes the specified cells/row. + * @param delete The object that specifies what to delete. + * @throws IOException if any exception occurs. + */ + public abstract void delete(Delete delete) throws IOException; + + /** + * Appends values to one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Appends are done under a single row lock, so + * write operations to a row are synchronized, but readers do not take row locks so get and scan + * operations can see this operation partially completed. + * @param append object that specifies the columns and amounts to be used for the increment + * operations + * @throws IOException if any exception occurs. + * @return values of columns after the append operation (maybe null) + */ + public abstract Result append(Append append) throws IOException; + + /** + * Increments one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Increments are done under a single row lock, + * so write operations to a row are synchronized, but readers do not take row locks so get and + * scan operations can see this operation partially completed. + * @param increment object that specifies the columns and amounts to be used for the increment + * operations + * @throws IOException if any exception occurs. + * @return values of columns after the increment + */ + public abstract Result increment(Increment increment) throws IOException; + + /** + * Take a snapshot with FLUSH-type, which means it will flush all data in memory to persistent + * storage. Snapshots are considered unique based on the name of the snapshot. Attempts to + * take a snapshot with the same name (even a different type or with different parameters) will + * fail with a {@link org.apache.hadoop.hbase.snapshot.SnapshotCreationException} indicating the + * duplicate naming. Snapshot names follow the same naming constraints as tables in HBase. See + * {@link org.apache.hadoop.hbase.TableName#isLegalFullyQualifiedTableName(byte[])}. + * @param snapshotName name of the snapshot to be created + * @throws IOException if any exception occurs. + */ + public abstract void snapshot(final String snapshotName) throws IOException; + + /** + * Create typed snapshot of the table. Snapshots are considered unique based on the name of the + * snapshot. Attempts to take a snapshot with the same name (even a different type or with + * different parameters) will fail with a {@link SnapshotCreationException} indicating the + * duplicate naming. Snapshot names follow the same naming constraints as tables in HBase. See + * {@link org.apache.hadoop.hbase.TableName#isLegalFullyQualifiedTableName(byte[])}. + * @param snapshotName name to give the snapshot on the filesystem. Must be unique from all other + * snapshots stored on the cluster + * @param type type of snapshot to take + * @throws IOException if any exception occurs. + */ + public abstract void snapshot(final String snapshotName, SnapshotType type) throws IOException; + + /** + * Delete an existing snapshot. + * @param snapshotName name of the snapshot + * @throws IOException if any exception occurs. + */ + public abstract void deleteSnapshot(String snapshotName) throws IOException; + + /** + * Restore the specified snapshot.A snapshot of the current state is taken before executing the + * restore operation. In case of restore failure, the failsafe snapshot will be restored. If the + * restore completes without problem the failsafe snapshot is deleted. + * @param snapshotName name of the snapshot to restore + * @throws IOException if any exception occurs. + */ + public abstract void restoreSnapshot(final String snapshotName, String tableName) + throws IOException; + + /** + * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of + * execution of the actions is not defined. Meaning if you do a Put and a Get in the same + * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put + * had put. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @param results Empty Object[], same size as actions. Provides access to partial results, in + * case an exception is thrown. A null in the result array means that the call for that + * action failed, even after retries. The order of the objects in the results array + * corresponds to the order of actions in the request list. + * @throws IOException if any exception occurs. + */ + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + // TODO this interface might still be useful for embedded mode? + } + + /** + * Test for the existence of columns in the table, as specified by the Get. + *

+ * This will return true if the Get matches one or more keys, false if not. + * @param get the Get + * @return true if the specified Get matches one or more keys, false if not + * @throws IOException if any exception occurs. + */ + public boolean exists(Get get) throws IOException { + // TODO this interface might still be useful for embedded mode? + return false; + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * performs the row mutations. If the passed value is null, the check is for the lack of column + * (ie: non-existence) + *

+ * The expected value argument of this call is on the left and the current value of the cell is on + * the right side of the comparison operator. + *

+ * Ie. eg. GREATER operator means expected value > existing <=> perform row mutations. + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @throws IOException if any exception occurs. + * @return true if the new put was executed, false otherwise + */ + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException { + // TODO this interface might still be useful for embedded mode? + return false; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/Options.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/Options.java new file mode 100644 index 0000000..86ffd31 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/Options.java @@ -0,0 +1,122 @@ +package org.apache.hadoop.hbase.embedded; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; +import org.apache.hadoop.hbase.util.Bytes; + +import java.util.ArrayList; +import java.util.Arrays; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class Options { + public static final byte[] DEFAULT_FAMILY = Bytes.toBytes("cf"); + private final Log LOG = LogFactory.getLog(Options.class); + + byte[] tableNameBytes; + TableName tableName; + byte[][] splitKeys; + int blockSize; + Algorithm compression; + Durability durability; + ArrayList families; + + public Options() { + this.blockSize = HColumnDescriptor.DEFAULT_BLOCKSIZE; + this.compression = Algorithm.valueOf(HColumnDescriptor.DEFAULT_COMPRESSION.toUpperCase()); + this.durability = Durability.USE_DEFAULT; + } + + public byte[] getTableNameBytes() { + return tableNameBytes; + } + + public void setTableNameBytes(byte[] tableNameBytes) { + this.tableNameBytes = tableNameBytes; + this.tableName = TableName.valueOf(tableNameBytes); + } + + public TableName getTableName() { + return tableName; + } + + public void setTableName(TableName tableName) { + this.tableName = tableName; + this.tableNameBytes = tableName.getName(); + } + + public byte[][] getSplitKeys() { + return splitKeys; + } + + public void setSplitKeys(byte[][] splitKeys) { + this.splitKeys = splitKeys; + } + + public int getBlockSize() { + return blockSize; + } + + public void setBlockSize(int blockSize) { + this.blockSize = blockSize; + } + + public Algorithm getCompression() { + return compression; + } + + public void setCompression(String compressionString) { + Algorithm compression; + try { + compression = Algorithm.valueOf(compressionString.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.debug("Unknown compression [" + compressionString + + "], supported compression algorithm including " + Arrays.asList(Algorithm.values()) + + ", please check your input", + e); + throw e; + } + this.compression = compression; + } + + public Durability getDurability() { + return durability; + } + + public void setDurability(String durabilityString) { + Durability durability; + try { + durability = Durability.valueOf(durabilityString.toUpperCase()); + } catch (IllegalArgumentException e) { + LOG.debug("Unknown durability [" + durabilityString + "], supported durability including " + + Arrays.asList(Durability.values()) + ", please check your input", + e); + throw e; + } + this.durability = durability; + } + + public void addColumnFamily(String familyName) { + if (families == null) { + families = new ArrayList(); + } + families.add(Bytes.toBytes(familyName)); + } + + public ArrayList getFamilies() { + return families; + } + + public static void main(String[] args) { + Options opts = new Options(); + opts.setDurability("async_wal"); + opts.setCompression("none"); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/TableOperator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/TableOperator.java new file mode 100644 index 0000000..eeb4f29 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/embedded/TableOperator.java @@ -0,0 +1,123 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SnapshotType; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; + +/** + * Operator based on {@link Table}, which runs the same process as in distributed mode except for no + * actual RPC happening + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class TableOperator extends Operator { + private final Admin admin; + + public TableOperator(EmbeddedHBase eHBase, Table table) throws IOException { + super(eHBase, table); + admin = eHBase.getConnection().getAdmin(); + } + + @Override + public boolean exists(Get get) throws IOException { + return table.exists(get); + } + + @Override + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + table.batch(actions, results); + } + + @Override + public Result get(Get get) throws IOException { + return table.get(get); + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return table.getScanner(scan); + } + + @Override + public void put(Put put) throws IOException { + table.put(put); + } + + @Override + public void delete(Delete delete) throws IOException { + table.delete(delete); + } + + @Override + public Result append(Append append) throws IOException { + return table.append(append); + } + + @Override + public Result increment(Increment increment) throws IOException { + return table.increment(increment); + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException { + return table.checkAndMutate(row, family, qualifier, compareOp, value, mutation); + } + + @Override + public void snapshot(String snapshotName) throws IOException { + // do snapshot with Admin + admin.snapshot(snapshotName, table.getName()); + } + + @Override + public void snapshot(String snapshotName, SnapshotType type) throws IOException { + admin.snapshot(snapshotName, table.getName(), type); + } + + @Override + public void deleteSnapshot(String snapshotName) throws IOException { + admin.deleteSnapshot(snapshotName); + } + + @Override + public void restoreSnapshot(String snapshotName, String tableName) throws IOException { + admin.restoreSnapshot(snapshotName); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index c4a4af9..f101dec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -598,7 +598,7 @@ public class HMaster extends HRegionServer implements MasterServices { */ @Override protected void waitForMasterActive(){ - boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf); + boolean tablesOnMaster = isEmbedded || BaseLoadBalancer.tablesOnMaster(conf); while (!(tablesOnMaster && isActiveMaster) && !isStopped() && !isAborted()) { sleeper.sleep(); @@ -638,7 +638,7 @@ public class HMaster extends HRegionServer implements MasterServices { protected void configureInfoServer() { infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class); infoServer.setAttribute(MASTER, this); - if (BaseLoadBalancer.tablesOnMaster(conf)) { + if (isEmbedded || BaseLoadBalancer.tablesOnMaster(conf)) { super.configureInfoServer(); } } @@ -810,7 +810,7 @@ public class HMaster extends HRegionServer implements MasterServices { favoredNodesManager = new FavoredNodesManager(this); } // Wait for regionserver to finish initialization. - if (BaseLoadBalancer.tablesOnMaster(conf)) { + if (isEmbedded || BaseLoadBalancer.tablesOnMaster(conf)) { waitForServerOnline(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index f27feb3..fe50350 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -41,6 +41,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerLoad; @@ -995,6 +996,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { protected ClusterStatus clusterStatus = null; protected ServerName masterServerName; protected MasterServices services; + protected boolean isEmbedded; /** * By default, regions of some small system tables such as meta, @@ -1050,6 +1052,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Collections.addAll(tablesOnMaster, tables); } this.rackManager = new RackManager(getConf()); + this.isEmbedded = conf.getBoolean(HConstants.EMBEDDED_MODE, false); regionFinder.setConf(conf); } @@ -1230,8 +1233,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { Map> assignments = assignMasterRegions(regions, servers); if (assignments != null && !assignments.isEmpty()) { servers = new ArrayList(servers); - // Guarantee not to put other regions on master - servers.remove(masterServerName); + // Guarantee not to put other regions on master if not in embedded mode + if (!isEmbedded) servers.remove(masterServerName); List masterRegions = assignments.get(masterServerName); if (!masterRegions.isEmpty()) { regions = new ArrayList(regions); @@ -1343,8 +1346,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { return masterServerName; } servers = new ArrayList(servers); - // Guarantee not to put other regions on master - servers.remove(masterServerName); + // Guarantee not to put other regions on master if not in embedded mode + if (!isEmbedded) servers.remove(masterServerName); } int numServers = servers == null ? 0 : servers.size(); @@ -1392,8 +1395,8 @@ public abstract class BaseLoadBalancer implements LoadBalancer { = assignMasterRegions(regions.keySet(), servers); if (assignments != null && !assignments.isEmpty()) { servers = new ArrayList(servers); - // Guarantee not to put other regions on master - servers.remove(masterServerName); + // Guarantee not to put other regions on master if not in embedded mode + if (!isEmbedded) servers.remove(masterServerName); List masterRegions = assignments.get(masterServerName); if (!masterRegions.isEmpty()) { regions = new HashMap(regions); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java index a6a0774..27d7f1a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/SimpleLoadBalancer.java @@ -256,7 +256,7 @@ public class SimpleLoadBalancer extends BaseLoadBalancer { return null; } clusterMap = new HashMap>(clusterMap); - clusterMap.remove(masterServerName); + if (!isEmbedded) clusterMap.remove(masterServerName); } long startTime = System.currentTimeMillis(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index 8825637..70afdb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -325,7 +325,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return null; } clusterState = new HashMap>(clusterState); - clusterState.remove(masterServerName); + if (!isEmbedded) clusterState.remove(masterServerName); } // On clusters with lots of HFileLinks or lots of reference files, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantFamilySizeRegionSplitPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantFamilySizeRegionSplitPolicy.java new file mode 100644 index 0000000..24caa91 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ConstantFamilySizeRegionSplitPolicy.java @@ -0,0 +1,94 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.util.Bytes; + +public class ConstantFamilySizeRegionSplitPolicy extends RegionSplitPolicy { + public static final Log LOG = LogFactory.getLog(ConstantFamilySizeRegionSplitPolicy.class); + private Map familyMaxFileSizes; + + @Override + protected void configureForRegion(HRegion region) { + super.configureForRegion(region); + familyMaxFileSizes = new TreeMap(); + HTableDescriptor htd = region.getTableDesc(); + long maxFileSize = htd.getMaxFileSize(); + if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) { + maxFileSize = + getConf().getLong("hbase.hregion.max.filesize", HConstants.DEFAULT_MAX_FILE_SIZE); + } + + HColumnDescriptor[] hcds = region.getTableDesc().getColumnFamilies(); + for (HColumnDescriptor hcd : hcds) { + long maxFamilyFileSize = maxFileSize; + String family = hcd.getNameAsString(); + String maxFamilyFileSizeStr = hcd.getValue(HTableDescriptor.MAX_FILESIZE); + if (null != maxFamilyFileSizeStr) { + maxFamilyFileSize = Long.parseLong(maxFamilyFileSizeStr); + } + familyMaxFileSizes.put(family, maxFamilyFileSize); + } + } + + @Override + protected boolean shouldSplit() { + boolean force = region.shouldForceSplit(); + boolean foundABigStore = false; + for (Store store : region.getStores()) { + if ((!store.canSplit())) { + return false; + } + + if (store.getSize() > familyMaxFileSizes.get(store.getFamily().getNameAsString())) { + foundABigStore = true; + break; + } + } + + return foundABigStore || force; + } + + @Override + protected byte[] getSplitPoint() { + // use explicit split point first + byte[] explicitSplitPoint = this.region.getExplicitSplitPoint(); + if (explicitSplitPoint != null) { + return explicitSplitPoint; + } + + // get split point by store size ratio + List stores = region.getStores(); + byte[] splitPointFromLargestStoreRatio = null; + String splitFamilyName = null; + long splitStoreSize = 0; + float largestStoreRatio = 0; + for (Store store : stores) { + long storeSize = store.getSize(); + long maxFileSize = familyMaxFileSizes.get(store.getFamily().getNameAsString()); + float storeRatio = (float) storeSize / (float) maxFileSize; + byte[] splitPoint = store.getSplitPoint(); + if (splitPoint != null && largestStoreRatio < storeRatio) { + splitPointFromLargestStoreRatio = splitPoint; + largestStoreRatio = storeRatio; + splitFamilyName = store.getFamily().getNameAsString(); + splitStoreSize = storeSize; + } + } + + if (null != splitFamilyName) { + LOG.info("split region: " + region.getRegionInfo().getRegionNameAsString() + + " , split family: " + splitFamilyName + ", family size: " + splitStoreSize + + " , split point: " + Bytes.toString(splitPointFromLargestStoreRatio)); + } + + return splitPointFromLargestStoreRatio; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2652301..17e3fc0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterRpcServices; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.mob.MobCacheConfig; @@ -192,6 +193,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -327,6 +329,8 @@ public class HRegionServer extends HasThread implements // space regions. private boolean stopping = false; + private boolean shutdown = false; + private volatile boolean killed = false; protected final Configuration conf; @@ -439,7 +443,7 @@ public class HRegionServer extends HasThread implements // the specification of server hostname is optional. The hostname should be resolvable from // both master and region server @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) - final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname"; + protected final static String RS_HOSTNAME_KEY = "hbase.regionserver.hostname"; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) protected final static String MASTER_HOSTNAME_KEY = "hbase.master.hostname"; @@ -510,6 +514,11 @@ public class HRegionServer extends HasThread implements protected SecureBulkLoadManager secureBulkLoadManager; + protected final int port; + + /** whether we are in embedded mode */ + protected final boolean isEmbedded; + /** * Starts a HRegionServer at the default location. */ @@ -570,7 +579,14 @@ public class HRegionServer extends HasThread implements } String hostName = shouldUseThisHostnameInstead() ? useThisHostnameInstead : rpcServices.isa.getHostName(); - serverName = ServerName.valueOf(hostName, rpcServices.isa.getPort(), startcode); + if (rpcServices.isa != null) { + port = rpcServices.isa.getPort(); + } else { + port = (rpcServices instanceof MasterRpcServices) + ? conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT) + : conf.getInt(HConstants.REGIONSERVER_PORT, HConstants.DEFAULT_REGIONSERVER_PORT); + } + serverName = ServerName.valueOf(hostName, port, startcode); rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); @@ -603,7 +619,7 @@ public class HRegionServer extends HasThread implements if (!conf.getBoolean("hbase.testing.nocluster", false)) { // Open connection to zookeeper and set primary watcher zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" + - rpcServices.isa.getPort(), this, canCreateBaseZNode()); + port, this, canCreateBaseZNode()); this.csm = (BaseCoordinatedStateManager) csm; this.csm.initialize(this); @@ -642,6 +658,7 @@ public class HRegionServer extends HasThread implements this.compactedFileDischarger = new CompactedHFilesDischarger(cleanerInterval, (Stoppable)this, (RegionServerServices)this); choreService.scheduleChore(compactedFileDischarger); + this.isEmbedded = conf.getBoolean(HConstants.EMBEDDED_MODE, false); } private void initializeFileSystem() throws IOException { @@ -923,8 +940,10 @@ public class HRegionServer extends HasThread implements rsQuotaManager = new RegionServerQuotaManager(this); // Setup RPC client for master communication - rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress( - rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics()); + InetSocketAddress localAddress = + rpcServices.isa == null ? null : new InetSocketAddress(rpcServices.isa.getAddress(), 0); + rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, + clusterConnection.getConnectionMetrics()); boolean onlyMetaRefresh = false; int storefileRefreshPeriod = conf.getInt( @@ -990,8 +1009,10 @@ public class HRegionServer extends HasThread implements // since the server is ready to run rspmHost.start(); - // Start the Quota Manager - rsQuotaManager.start(getRpcServer().getScheduler()); + if (!isEmbedded) { + // Start the Quota Manager + rsQuotaManager.start(getRpcServer().getScheduler()); + } } // We registered with the Master. Go into run mode. @@ -1176,6 +1197,7 @@ public class HRegionServer extends HasThread implements LOG.info("stopping server " + this.serverName + "; zookeeper connection closed."); + this.shutdown = true; LOG.info(Thread.currentThread().getName() + " exiting"); } @@ -1416,8 +1438,7 @@ public class HRegionServer extends HasThread implements // The hostname the master sees us as. if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) { String hostnameFromMasterPOV = e.getValue(); - this.serverName = ServerName.valueOf(hostnameFromMasterPOV, - rpcServices.isa.getPort(), this.startcode); + this.serverName = ServerName.valueOf(hostnameFromMasterPOV, port, this.startcode); if (shouldUseThisHostnameInstead() && !hostnameFromMasterPOV.equals(useThisHostnameInstead)) { String msg = "Master passed us a different hostname to use; was=" + @@ -2347,6 +2368,9 @@ public class HRegionServer extends HasThread implements this.replicationSinkHandler.stopReplicationService(); } } + if (metricsRegionServer != null) { + DefaultMetricsSystem.instance().shutdown(); + } } /** @@ -2482,9 +2506,8 @@ public class HRegionServer extends HasThread implements rpcServices.rpcMultiRequestCount.reset(); rpcServices.rpcMutateRequestCount.reset(); LOG.info("reportForDuty to master=" + masterServerName + " with port=" - + rpcServices.isa.getPort() + ", startcode=" + this.startcode); + + port + ", startcode=" + this.startcode); long now = EnvironmentEdgeManager.currentTime(); - int port = rpcServices.isa.getPort(); RegionServerStartupRequest.Builder request = RegionServerStartupRequest.newBuilder(); if (shouldUseThisHostnameInstead()) { request.setUseThisHostnameInstead(useThisHostnameInstead); @@ -3650,4 +3673,8 @@ public class HRegionServer extends HasThread implements return new LockServiceClient(conf, lockStub, clusterConnection.getNonceGenerator()) .regionLock(regionInfos, description, abort); } + + protected boolean isShutDown() { + return this.shutdown; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index e6c2a49..1c2088f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -260,6 +260,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final AtomicLong scannerIdGen = new AtomicLong(0L); private final ConcurrentMap scanners = new ConcurrentHashMap<>(); + private final boolean isEmbedded; + /** * The lease timeout period for client scanners (milliseconds). */ @@ -1020,7 +1022,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } - private void closeAllScanners() { + protected void closeAllScanners() { // Close any outstanding scanners. Means they'll get an UnknownScanner // exception next time they come in. for (Map.Entry e : scanners.entrySet()) { @@ -1071,17 +1073,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String name = rs.getProcessName() + "/" + initialIsa.toString(); // Set how many times to retry talking to another server over Connection. ConnectionUtils.setServerSideHConnectionRetriesConfig(rs.conf, name, LOG); - try { - rpcServer = RpcServerFactory.createRpcServer(rs, name, getServices(), - bindAddress, // use final bindAddress for this server. - rs.conf, - rpcSchedulerFactory.create(rs.conf, this, rs)); - rpcServer.setRsRpcServices(this); - } catch (BindException be) { - String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT : - HConstants.REGIONSERVER_PORT; - throw new IOException(be.getMessage() + ". To switch ports use the '" + configName + - "' configuration property.", be.getCause() != null ? be.getCause() : be); + + isEmbedded = rs.conf.getBoolean(HConstants.EMBEDDED_MODE, false); + if (!isEmbedded) { + try { + rpcServer = RpcServerFactory.createRpcServer(rs, name, getServices(), + bindAddress, // use final bindAddress for this server. + rs.conf, + rpcSchedulerFactory.create(rs.conf, this, rs)); + rpcServer.setRsRpcServices(this); + } catch (BindException be) { + String configName = (this instanceof MasterRpcServices) ? HConstants.MASTER_PORT : + HConstants.REGIONSERVER_PORT; + throw new IOException(be.getMessage() + ". To switch ports use the '" + configName + + "' configuration property.", be.getCause() != null ? be.getCause() : be); + } + InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + // Set our address, however we need the final port that was given to rpcServer + isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); + rpcServer.setErrorHandler(this); + rs.setName(name); + } else { + rpcServer = null; + isa = null; } scannerLeaseTimeoutPeriod = rs.conf.getInt( @@ -1096,15 +1113,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, minimumScanTimeLimitDelta = rs.conf.getLong( REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA, DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA); - - InetSocketAddress address = rpcServer.getListenerAddress(); - if (address == null) { - throw new IOException("Listener channel is closed"); - } - // Set our address, however we need the final port that was given to rpcServer - isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); - rpcServer.setErrorHandler(this); - rs.setName(name); } @Override @@ -1268,13 +1276,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return regionServer.getRegionServerQuotaManager(); } - void start() { - rpcServer.start(); + protected void start() { + if (!isEmbedded) { + rpcServer.start(); + } } - void stop() { + protected void stop() { closeAllScanners(); - rpcServer.stop(); + if (!isEmbedded) { + rpcServer.stop(); + } } /** @@ -3182,4 +3194,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return UpdateConfigurationResponse.getDefaultInstance(); } + protected void reclaimMemStoreMemory() { + regionServer.cacheFlusher.reclaimMemStoreMemory(); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/embedded/TestEmbeddedHBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/embedded/TestEmbeddedHBase.java new file mode 100644 index 0000000..8969b93 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/embedded/TestEmbeddedHBase.java @@ -0,0 +1,437 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.embedded; + +import static org.apache.hadoop.hbase.HBaseCommonTestingUtility.BASE_TEST_DIRECTORY_KEY; +import static org.apache.hadoop.hbase.HBaseCommonTestingUtility.DEFAULT_BASE_TEST_DIRECTORY; + +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category(SmallTests.class) +public class TestEmbeddedHBase { + + @Rule + public TestName name = new TestName(); + + @Parameterized.Parameter + public String operatorClass; + + private EmbeddedHBase hbase = null; + private String path = null; + private final Log LOG = LogFactory.getLog(TestEmbeddedHBase.class); + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + { TableOperator.class.getName() }, + { DirectOperator.class.getName() } + }); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + DefaultMetricsSystem.setMiniClusterMode(true); + } + + @Before + public void setUpBeforeTest() throws Exception { + Options opts = new Options(); + String testName = name.getMethodName().replaceAll("\\[", "-").replaceAll("\\]", ""); + path = System.getProperty(BASE_TEST_DIRECTORY_KEY, DEFAULT_BASE_TEST_DIRECTORY) + File.separator + + testName; + LOG.debug("Test path: " + path); + opts.setTableName(TableName.valueOf(testName)); + opts.setSplitKeys(EmbeddedHBase.getSplitKeys(10)); + // opts.setDurability("async_wal"); + Configuration conf = HBaseConfiguration.create(); + conf.set(HConstants.EMBEDDED_DB_OPERATOR_CLASS, operatorClass); + conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024); + LOG.debug("Opeator class: " + operatorClass); + hbase = EmbeddedHBase.open(opts, path, conf); + } + + @After + public void tearDownAfterTest() throws Exception { + if (hbase != null) { + hbase.close(); + } + if (path != null) { + FileUtils.deleteDirectory(new File(path)); + } + } + + @Test + public void testPutAndGet() throws Exception { + final int THREAD_NUM = 100; + final byte[] family = Bytes.toBytes("cf"); + final byte[] qualifier = Bytes.toBytes("c1"); + final AtomicInteger successCnt = new AtomicInteger(0); + ArrayList threads = new ArrayList<>(THREAD_NUM); + for (int i = 0; i < THREAD_NUM; i++) { + final int index = i; + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + final byte[] row = Bytes.toBytes("row-" + index); + final byte[] value = Bytes.toBytes("v" + index); + try { + Put put = new Put(row); + put.addColumn(family, qualifier, value); + hbase.put(put); + Get get = new Get(row); + Result result = hbase.get(get); + byte[] returnedValue = result.getValue(family, qualifier); + if (Bytes.equals(value, returnedValue)) { + successCnt.getAndIncrement(); + } else { + LOG.error("Should be equal but not, original value: " + Bytes.toString(value) + + ", returned value: " + + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }); + threads.add(t); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + Assert.assertEquals(THREAD_NUM, successCnt.get()); + } + + @Test + public void testAppendAndDelete() throws Exception { + // FIXME the test fails with more threads, we need to locate the root cause and fix it + final int THREAD_NUM = 2; + final byte[] family = Bytes.toBytes("cf"); + final byte[] qualifier = Bytes.toBytes("c1"); + final byte[] row = Bytes.toBytes("row"); + final byte[] value = Bytes.toBytes("v"); + Get g = new Get(row); + LOG.debug("Result before delete: " + hbase.get(g)); + // cleanup first to avoid dirty state caused by previous failure before append + Delete delete = new Delete(row); + hbase.delete(delete); + LOG.debug("Result before append: " + hbase.get(g)); + // test append + ArrayList threads = new ArrayList<>(THREAD_NUM); + for (int i = 0; i < THREAD_NUM; i++) { + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + try { + Append append = new Append(row); + append.add(family, qualifier, value); + hbase.append(append); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }); + threads.add(t); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + Get get = new Get(row); + Result result = hbase.get(get); + byte[] actualResult = result.getValue(family, qualifier); + LOG.debug("Result: " + Bytes.toString(actualResult)); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < THREAD_NUM; i++) { + builder.append("v"); + } + Assert.assertEquals(builder.toString(), Bytes.toString(actualResult)); + // test delete again + delete = new Delete(row); + hbase.delete(delete); + Threads.sleep(100); + get = new Get(row); + Result checkDelete = hbase.get(get); + LOG.debug("Result after delete: " + Bytes.toString(checkDelete.getValue(family, qualifier))); + Assert.assertTrue("Should be empty after delete but actually not", checkDelete.isEmpty()); + } + + @Test + public void testIncrement() throws Exception { + // FIXME the test fails with more threads, we need to locate the root cause and fix it + final int THREAD_NUM = 2; + final byte[] family = Bytes.toBytes("cf"); + final byte[] qualifier = Bytes.toBytes("c1"); + final byte[] row = Bytes.toBytes("row"); + Get g = new Get(row); + LOG.debug("Result before delete: " + hbase.get(g)); + // cleanup first to avoid dirty state caused by previous failure before append + Delete delete = new Delete(row); + hbase.delete(delete); + LOG.debug("Result before increment: " + hbase.get(g)); + // test append + ArrayList threads = new ArrayList<>(THREAD_NUM); + for (int i = 0; i < THREAD_NUM; i++) { + Thread t = new Thread(new Runnable() { + + @Override + public void run() { + try { + Increment incre = new Increment(row); + incre.addColumn(family, qualifier, 1L); + hbase.increment(incre); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + }); + threads.add(t); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + Get get = new Get(row); + Result result = hbase.get(get); + long actualResult = Bytes.toLong(result.getValue(family, qualifier)); + LOG.debug("Result: " + actualResult); + Assert.assertEquals(THREAD_NUM, actualResult); + // test delete again + delete = new Delete(row); + hbase.delete(delete); + Threads.sleep(100); + get = new Get(row); + Result checkDelete = hbase.get(get); + LOG.debug("Result after delete: " + Bytes.toString(checkDelete.getValue(family, qualifier))); + Assert.assertTrue("Should be empty after delete but actually not", checkDelete.isEmpty()); + } + + @Test + public void testScan() throws Exception { + final int THREAD_NUM = 100; + final int DATA_NUM = 200; + final int INTER = (int) Math.ceil(1.0 * DATA_NUM / THREAD_NUM); + final byte[] family = Bytes.toBytes("cf"); + final byte[] qualifier = Bytes.toBytes("c1"); + + final AtomicInteger successCnt = new AtomicInteger(0); + ArrayList threads = new ArrayList<>(THREAD_NUM); + for (int i = 0; i < THREAD_NUM; i++) { + final int threadId = i; + Thread t = new Thread(new Runnable() { + @Override + public void run() { + int start = INTER * threadId; + int end = Math.min(INTER * (threadId + 1), DATA_NUM); + + for (int j = start; j < end; j++) { + final byte[] row = Bytes.toBytes(String.format("user%04d", j)); + final byte[] value = Bytes.toBytes("v" + j); + try { + Put put = new Put(row); + put.addColumn(family, qualifier, value); + hbase.put(put); + Scan scan = new Scan().withStartRow(row); + scan.setFilter(new SingleColumnValueFilter("cf".getBytes(), "c1".getBytes(), + CompareFilter.CompareOp.EQUAL, value)); + ResultScanner scanner = hbase.getResultScanner(scan); + Result result = scanner.next(); + if (result == null) { + return; + } + byte[] returnedValue = result.getValue(family, qualifier); + if (Bytes.equals(value, returnedValue)) { + successCnt.getAndIncrement(); + } else { + LOG.error("Should be equal but not, original value: " + Bytes.toString(value) + + ", returned value: " + + (returnedValue == null ? "null" : Bytes.toString(returnedValue))); + } + + Thread.sleep(100); // for low pressure in put + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + } + }); + threads.add(t); + } + + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + Assert.assertEquals(DATA_NUM, successCnt.get()); + + ResultScanner scanner = null; + + try { + for (int i = 0; i <= DATA_NUM; i++) { + Scan scan = new Scan().withStartRow(Bytes.toBytes(String.format("user%04d", i))); + scanner = hbase.getResultScanner(scan); + + int count = 0; + while (scanner.next() != null) { + count++; + } + + Assert.assertEquals(DATA_NUM - i, count); + } + } finally { + if (scanner != null) { + scanner.close(); + scanner = null; + } + } + + // empty start row for reverse scan + try { + Scan emptyRowScan = new Scan(); + emptyRowScan.withStartRow(HConstants.EMPTY_START_ROW); + emptyRowScan.setReversed(true); + scanner = hbase.getResultScanner(emptyRowScan); + + Result result = null; + int count = 0; + while ((result = scanner.next()) != null) { + LOG.debug("Result: " + result); + count++; + } + + Assert.assertEquals(0, count); + } finally { + if (scanner != null) { + scanner.close(); + scanner = null; + } + } + + // Reverse scan test + try { + for (int i = 0; i <= DATA_NUM; i++) { + Scan scan = new Scan().withStartRow(Bytes.toBytes(String.format("user%04d", i))); + scan.setReversed(true); + scanner = hbase.getResultScanner(scan); + + int count = 0; + while (scanner.next() != null) { + count++; + } + + if (i < DATA_NUM) { + Assert.assertEquals(i + 1, count); + } else { + Assert.assertEquals(DATA_NUM, count); + } + } + } finally { + if (scanner != null) { + scanner.close(); + scanner = null; + } + } + } + + @Test(timeout = 60000) + public void testSnapshot() throws Exception { + final int ROW_NUM = 100; + final byte[] family = Bytes.toBytes("cf"); + final byte[] qualifier = Bytes.toBytes("c1"); + final String snapshotName = name.getMethodName().replaceAll("\\[", "-").replaceAll("\\]", ""); + // Put some data + for (int i = 0; i < ROW_NUM; i++) { + byte[] row = Bytes.toBytes("row-" + i); + byte[] value = Bytes.toBytes("v-" + i); + Put put = new Put(row); + put.addColumn(family, qualifier, value); + hbase.put(put); + } + // cleanup first to avoid dirty state caused by previous failure before snapshot + try { + hbase.deleteSnapshot(snapshotName); + LOG.debug("Snapshot deleted: " + snapshotName); + } catch (SnapshotDoesNotExistException e) { + // OK if snapshot doesn't exist + } + // do snapshot + hbase.snapshot(snapshotName); + // do some modification + final byte[] rowAfterSnapshot = Bytes.toBytes("rowAfterSnapshot"); + Put put = new Put(rowAfterSnapshot); + put.addColumn(family, qualifier, rowAfterSnapshot); + hbase.put(put); + Assert.assertArrayEquals(rowAfterSnapshot, + hbase.get(new Get(rowAfterSnapshot)).getValue(family, qualifier)); + // do restore + hbase.restoreSnapshot(snapshotName); + // make sure data restored + Assert.assertTrue(hbase.get(new Get(rowAfterSnapshot)).isEmpty()); + Assert.assertArrayEquals(Bytes.toBytes("v-1"), + hbase.get(new Get(Bytes.toBytes("row-1"))).getValue(family, qualifier)); + } +}