diff --git hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index 8a73839..ad46921 100644 --- hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -88,4 +88,12 @@ public interface ProcedureStore { * @param procId the ID of the procedure to remove. */ void delete(long procId); + + void start(int numThreads) throws IOException; + + void stop(); + + /* + Procedure get(long procId); + */ } \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index a80a07e..6c2c6d7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -42,20 +42,30 @@ import org.mortbay.log.Log; @InterfaceAudience.Private public class ClientSideRegionScanner extends AbstractClientScanner { - private HRegion region; - RegionScanner scanner; + private final HRegion region; + private RegionScanner scanner; List values; public ClientSideRegionScanner(Configuration conf, FileSystem fs, Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics) throws IOException { + // open region from the snapshot directory + this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); + // region is immutable, set isolation level scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - // open region from the snapshot directory - this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); + init(this.region, scan, scanMetrics); + } + + @InterfaceAudience.Private + public ClientSideRegionScanner(HRegion region, Scan scan) throws IOException { + this.region = region; + init(region, scan, null); + } + private void init(HRegion region, Scan scan, ScanMetrics scanMetrics) throws IOException { // create an internal region scanner this.scanner = region.getScanner(scan); values = new ArrayList(); @@ -98,7 +108,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner { if (this.scanner != null) { try { this.scanner.close(); - this.scanner = null; } catch (IOException ex) { Log.warn("Exception while closing scanner", ex); } @@ -107,7 +116,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner { try { this.region.closeRegionOperation(); this.region.close(true); - this.region = null; } catch (IOException ex) { Log.warn("Exception while closing region", ex); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 95771eb..247b7c6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -22,6 +22,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Constructor; @@ -47,6 +48,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -109,12 +111,14 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.store.RegionProcedureStore; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.RegionStateListener; +import org.apache.hadoop.hbase.regionserver.BootstrapTableService; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -296,7 +300,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { private MasterQuotaManager quotaManager; private ProcedureExecutor procedureExecutor; - private WALProcedureStore procedureStore; + private ProcedureStore procedureStore; // handle table states private TableStateManager tableStateManager; @@ -307,6 +311,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server { /** jetty server for master to redirect requests to regionserver infoServer */ private org.mortbay.jetty.Server masterJettyServer; + private BootstrapTableService bootstrapTableService; + public static class RedirectServlet extends HttpServlet { private static final long serialVersionUID = 2894774810058302472L; private static int regionServerInfoPort; @@ -603,6 +609,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server { status.setStatus("Initializing Master file system"); this.masterActiveTime = System.currentTimeMillis(); + + // Start booststrap table service to serve small persisted data + bootstrapTableService = new BootstrapTableService(conf, serverName); + bootstrapTableService.start(); + // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring. this.fileSystemManager = new MasterFileSystem(this, this); @@ -1068,8 +1079,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { final Path logDir = new Path(fileSystemManager.getRootDir(), MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR); - procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir, - new MasterProcedureEnv.WALStoreLeaseRecovery(this)); + procedureStore = new RegionProcedureStore(bootstrapTableService.createConnection()); procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore, procEnv.getProcedureQueue()); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/RegionProcedureStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/RegionProcedureStore.java new file mode 100644 index 0000000..17412c7 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/RegionProcedureStore.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.store; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A ProcedureStore implementation where we keep the state of the procedures in a single region. + * The procId is the row key, and the state of the procedure is serialized in info:data column + */ +@InterfaceAudience.Private +public class RegionProcedureStore implements ProcedureStore { + + private static final Log LOG = LogFactory.getLog(RegionProcedureStore.class); + + public static TableName PROCEDURES_TABLE_NAME = TableName.valueOf( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME, Bytes.toBytes("procedures")); + + private static byte[] FAMILY = Bytes.toBytes("info"); + private static byte[] QUALIFIER = Bytes.toBytes("data"); + + private static HTableDescriptor TABLE_DESCRIPTOR + = new HTableDescriptor(RegionProcedureStore.PROCEDURES_TABLE_NAME) + .addFamily(new HColumnDescriptor(RegionProcedureStore.FAMILY)); + + private Connection connection; + private Table table; + + public static HTableDescriptor getTableDescriptor() { + return TABLE_DESCRIPTOR; + } + + public RegionProcedureStore(Connection connection) throws IOException { + this.connection = connection; + this.table = connection.getTable(PROCEDURES_TABLE_NAME); + } + + @Override + public void registerListener(ProcedureStoreListener listener) { + } + + @Override + public boolean unregisterListener(ProcedureStoreListener listener) { + return false; + } + + @Override + public void recoverLease() throws IOException { + // no op. Already done in EmbeddedTableService + } + + @Override + public void start(int numThreads) throws IOException { + } + + @Override + public void stop() { + try { + connection.close(); + } catch (IOException e) { + LOG.warn("Received IOException closing the connection", e); + } + } + + @Override + public Iterator load() throws IOException { + ResultScanner scanner = table.getScanner(new Scan()); + + final Iterator raw = scanner.iterator(); + + return new Iterator() { + @Override + public boolean hasNext() { + return raw.hasNext(); + } + + @Override + public Object next() { + Result result = (Result) raw.next(); + if (result == null) { + return null; + } + Procedure proc; + try { + proc = deserializeProcedure(result); + return proc; + } catch (IOException e) { + LOG.warn("Received exception, skipping Procedure", e); + if (hasNext()) { + return next(); + } else { + return null; + } + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void insert(Procedure proc, Procedure[] subprocs) { + if (LOG.isTraceEnabled()) { + LOG.trace("insert " + proc + " subproc=" + Arrays.toString(subprocs)); + } + + try { + Put put = serializeProcedure(proc); + List puts; + if (subprocs != null) { + puts = new ArrayList(subprocs.length + 1); + puts.add(put); + for (int i = 0; i < subprocs.length; ++i) { + puts.add(serializeProcedure(subprocs[i])); + } + table.put(puts); // Assume atomic update using HRegion.mutateRowsWithLocks() + } else { + assert !proc.hasParent(); + table.put(put); + } + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize one of the procedure: proc=" + proc + + " subprocs=" + Arrays.toString(subprocs), e); + throw new RuntimeException(e); + } + } + + @Override + public void update(Procedure proc) { + if (LOG.isTraceEnabled()) { + LOG.trace("update " + proc); + } + try { + Put put = serializeProcedure(proc); + table.put(put); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedure: proc=" + proc, e); + throw new RuntimeException(e); + } + } + + @Override + public void delete(long procId) { + try { + if (LOG.isTraceEnabled()) { + LOG.trace("delete " + procId); + } + Delete delete = new Delete(Bytes.toBytes(procId)); + table.delete(delete); + } catch (IOException e) { + // We are not able to serialize the procedure. + // this is a code error, and we are not able to go on. + LOG.fatal("Unable to serialize the procedure: " + procId, e); + throw new RuntimeException(e); + } + } + + protected Put serializeProcedure(Procedure proc) + throws IOException { + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Procedure.serializeProcedure(out, proc); + + Put put = new Put(Bytes.toBytes(proc.getProcId())); + put.addImmutable(FAMILY, QUALIFIER, out.toByteArray()); + return put; + } + + protected Procedure deserializeProcedure(Result result) + throws IOException { + + long procId = Bytes.toLong(result.getRow()); + Cell cell = result.getColumnLatestCell(FAMILY, QUALIFIER); + if (cell == null) { + throw new IOException("Deserializing of procedure failed from Result: " + result); + } + InputStream in = new ByteArrayInputStream(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength()); + + return Procedure.deserializeProcedure(procId, in); + } +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapTableService.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapTableService.java new file mode 100644 index 0000000..df8ac85 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/BootstrapTableService.java @@ -0,0 +1,598 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorParams; +import org.apache.hadoop.hbase.client.ClientSideRegionScanner; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.procedure2.store.RegionProcedureStore; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; + +import com.google.protobuf.Descriptors.MethodDescriptor; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; + +/** + * An bootstap table is: + * - Only 1 region + * - Not assigned through regular assignment, independent of meta of other tables + * - Hosted in only 1 server (typically master) + * - Has a dedicated WAL + * - Embedded + * - Not visible through regular means + */ +@InterfaceAudience.Private +public class BootstrapTableService { + private static final Log LOG = LogFactory.getLog(BootstrapTableService.class); + + // TODO: implement Service from guava + + private static String BOOTSTRAP_TABLE_DIR = "boostrap-tables"; + + private final FileSystem fs; + private final Path rootdir; + private final Configuration conf; + private final ServerName serverName; + + private WALFactory walFactory; + + /** Similar to first meta region, regions of embedded tables are bootstrapped with region_id = 1*/ + private static final long DEFAULT_REGION_ID = 1L; + + private ConcurrentHashMap tables = new ConcurrentHashMap<>(); + + /** We cannot use the region servers onlineRegions since it will make it visible to clients */ + protected final Map onlineRegions = new ConcurrentHashMap<>(); + + public BootstrapTableService(Configuration conf, ServerName serverName) + throws IOException { + + this.conf = new Configuration(conf); // clone the conf since we are going to change it + + // pass the procedure directory as the root directory for the WAL + Path rootDir = FSUtils.getRootDir(this.conf); + this.conf.set(HConstants.HBASE_DIR, new Path(rootDir, BOOTSTRAP_TABLE_DIR).toString()); + this.rootdir = FSUtils.getRootDir(this.conf); + + this.fs = this.rootdir.getFileSystem(this.conf); + + this.serverName = serverName; + } + + // TODO: maybe decouple this to services + private void addBootstrappedTables() { + HTableDescriptor procedureTable = RegionProcedureStore.getTableDescriptor(); + + tables.put(procedureTable.getTableName(), procedureTable); + + for (HTableDescriptor htd : tables.values()) { + // disable splitting for these tables + htd.setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy"); + } + } + + public void start() throws IOException { + + addBootstrappedTables(); + + List regionInfos = bootstrap(); + + recoverWALs(); + + setupWAL(); + + openRegions(regionInfos); + } + + public void stop() throws IOException { + // TODO: close all the regions and WAL files + + } + + private void recoverWALs() { + // TODO + // rename previous WAL directories to splitting + // call recover lease + // move/rewrite the wals to recovered.edits per region + } + + private void setupWAL() throws IOException { + final String logName = DefaultWALProvider.getWALDirectoryName(this.serverName.toString()); + + Path logdir = new Path(rootdir, logName); + if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); + if (this.fs.exists(logdir)) { + throw new IOException("EmbeddedTableService server has already " + + "created directory at " + logdir); + } + walFactory = new WALFactory(conf, null, serverName.toString()); + } + + private List bootstrap() throws IOException { + List regionInfos = new ArrayList<>(); + + checkAndCreateDirectory(rootdir); + + // all embedded tables should be in hbase namespace + Path nsDir = FSUtils.getNamespaceDir(rootdir, NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR); + checkAndCreateDirectory(nsDir); + + for (HTableDescriptor htd : tables.values()) { + HRegionInfo regionInfo = checkAndCreateTableAndRegionDir(htd); + regionInfos.add(regionInfo); + } + return regionInfos; + } + + /** + * Checks whether the directory exists. Creates it otherwise + * @param dir the directory to create + * @throws IOException + */ + private void checkAndCreateDirectory(Path dir) throws IOException { + if (!fs.exists(dir)) { + LOG.info("Creating directory: " + dir); + fs.mkdirs(dir); + } + } + + /** Returns the only possible regionInfo for the table */ + private HRegionInfo getRegionInfo(HTableDescriptor htd) { + return new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, + false, DEFAULT_REGION_ID); + } + + private HRegionInfo checkAndCreateTableAndRegionDir(HTableDescriptor htd) throws IOException { + Path tableDir = FSUtils.getTableDir(rootdir, htd.getTableName()); + checkAndCreateDirectory(tableDir); + + HRegionInfo regionInfo = getRegionInfo(htd); + + HRegionFileSystem regionFs = new HRegionFileSystem(conf, fs, tableDir, regionInfo); + + if (!fs.exists(regionFs.getRegionDir())) { + LOG.info("Creating directory: " + regionFs.getRegionDir()); + // TODO: we should also check regionInfo, and it is readable. What if following fails after + // dir is created but before regionInfo is persisted + HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, regionInfo); + } + return regionInfo; + } + + private void openRegions(List regionInfos) throws IOException { + for (HRegionInfo regionInfo : regionInfos) { + HTableDescriptor htd = tables.get(regionInfo.getTable()); + + // TODO: WAL file cleaning? + + // TODO: periodic flusher, global flusher etc cannot see these regions + HRegion region = HRegion.openHRegion(rootdir, regionInfo, htd, + walFactory.getWAL(regionInfo.getEncodedNameAsBytes()), conf, null, null); + + onlineRegions.put(htd.getTableName(), region); + } + } + + public Connection createConnection() { + return new EmbeddedConnection(); + } + + class EmbeddedConnection implements Connection { + @Override + public void abort(String why, Throwable e) { + // TODO Auto-generated method stub + } + + @Override + public boolean isAborted() { + // TODO Auto-generated method stub + return false; + } + + @Override + public Configuration getConfiguration() { + // TODO Auto-generated method stub + return null; + } + + @Override + public Table getTable(TableName tableName) throws IOException { + HRegion region = onlineRegions.get(tableName); + if (region == null) { + throw new TableNotFoundException(tableName); + } + return new EmbeddedTable(region); + } + + @Override + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Admin getAdmin() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void close() throws IOException { + // noop + } + + @Override + public boolean isClosed() { + // TODO Auto-generated method stub + return false; + } + } + + static class EmbeddedTable implements Table { + private HRegion region; + + EmbeddedTable(HRegion region) { + this.region = region; + } + + @Override + public TableName getName() { + return region.getTableDesc().getTableName(); + } + + @Override + public Configuration getConfiguration() { + return region.conf; + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return region.getTableDesc(); + } + + @Override + public boolean exists(Get get) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean[] existsAll(List gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void batch(List actions, Object[] results) throws IOException, + InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public Object[] batch(List actions) throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public void batchCallback(List actions, Object[] results, + Callback callback) throws IOException, InterruptedException { + // TODO Auto-generated method stub + + } + + @Override + public Object[] batchCallback(List actions, Callback callback) + throws IOException, InterruptedException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result get(Get get) throws IOException { + return region.get(get); + } + + @Override + public Result[] get(List gets) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + return new ClientSideRegionScanner(region, scan); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + return getScanner(new Scan().addFamily(family)); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + return getScanner(new Scan().addColumn(family, qualifier)); + } + + @Override + public void put(Put put) throws IOException { + region.put(put); + } + + @Override + public void put(List puts) throws IOException { + List mutations = new ArrayList<>(puts.size()); // sigh, this should not be needed. + for (Put put : puts) { + mutations.add(put); + } + atomicMultiMutate(mutations); + } + + /** + * Do an atomic multi row mutation. The code is cp'ed from MultiRowMutationEndpoint + * @param mutations + * @throws IOException + */ + private void atomicMultiMutate(Collection mutations) throws IOException { + // set of rows to lock, sorted to avoid deadlocks + SortedSet rowsToLock = new TreeSet(Bytes.BYTES_COMPARATOR); + + for (Mutation m : mutations) { + rowsToLock.add(m.getRow()); + } + + // call utility method on region + region.mutateRowsWithLocks(mutations, rowsToLock); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) + throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, Put put) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void delete(Delete delete) throws IOException { + region.delete(delete); + } + + @Override + public void delete(List deletes) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, + Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, Delete delete) throws IOException { + // TODO Auto-generated method stub + return false; + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public Result append(Append append) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public Result increment(Increment increment) throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) + throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, + Durability durability) throws IOException { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void close() throws IOException { + // noop + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + // TODO Auto-generated method stub + return null; + } + + @Override + public Map coprocessorService(Class service, + byte[] startKey, byte[] endKey, Call callable) throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public void coprocessorService(Class service, byte[] startKey, + byte[] endKey, Call callable, Callback callback) throws ServiceException, + Throwable { + // TODO Auto-generated method stub + + } + + @Override + public long getWriteBufferSize() { + // TODO Auto-generated method stub + return 0; + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public Map batchCoprocessorService( + MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, + R responsePrototype) throws ServiceException, Throwable { + // TODO Auto-generated method stub + return null; + } + + @Override + public void batchCoprocessorService(MethodDescriptor methodDescriptor, + Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback callback) + throws ServiceException, Throwable { + // TODO Auto-generated method stub + + } + + @Override + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp, + byte[] value, RowMutations mutation) throws IOException { + // TODO Auto-generated method stub + return false; + } + } + + // TODO: for debugging, remove + public static void main(String[] args) throws IOException { + BootstrapTableService service = new BootstrapTableService(HBaseConfiguration.create(), + ServerName.valueOf("localhost", 16010, EnvironmentEdgeManager.currentTime())); + service.start(); + + try (Connection connection = service.createConnection(); + Table table = connection.getTable(RegionProcedureStore.PROCEDURES_TABLE_NAME);) { + table.put(new Put(Bytes.toBytes("5")).addColumn(Bytes.toBytes("info"), + null, Bytes.toBytes("42"))); + + System.out.println(table.get(new Get(Bytes.toBytes("5")))); + + List puts = new ArrayList<>(); + puts.add(new Put(Bytes.toBytes("10")).addColumn(Bytes.toBytes("info"), + null, Bytes.toBytes("42"))); + puts.add(new Put(Bytes.toBytes("11")).addColumn(Bytes.toBytes("info"), + null, Bytes.toBytes("42"))); + puts.add(new Put(Bytes.toBytes("1")).addColumn(Bytes.toBytes("info"), + null, Bytes.toBytes("42"))); + + table.put(puts); + + ResultScanner scanner = table.getScanner(new Scan()); + + for (Result result : scanner) { + System.out.println(result); + } + + } + } + +}