diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java index 996a289..5193299 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.List; @@ -34,24 +32,10 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.thrift.generated.IOError; import org.apache.hadoop.hbase.thrift.generated.TRowResult; -import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer; -import org.apache.hadoop.hbase.thrift.TBoundedThreadPoolServer.Args; -import org.apache.hadoop.hbase.thrift.ThriftServer; +import org.apache.hadoop.hbase.thrift.ThriftServerRunner; import org.apache.hadoop.hbase.thrift.ThriftUtilities; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.server.TNonblockingServer; -import org.apache.thrift.server.TServer; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TNonblockingServerTransport; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransportFactory; /** * HRegionThriftServer - this class starts up a Thrift server in the same @@ -68,43 +52,59 @@ public class HRegionThriftServer extends Thread { public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class); public static final int DEFAULT_LISTEN_PORT = 9090; - private HRegionServer rs; - private Configuration conf; - - private int port; - private boolean nonblocking; - private String bindIpAddress; - private String transport; - private String protocol; - volatile private TServer tserver; - - /** - * Whether requests should be redirected to other RegionServers if the - * specified region is not hosted by this RegionServer. - */ - private boolean redirect; + private final HRegionServer rs; + private final ThriftServerRunner serverRunner; /** * Create an instance of the glue object that connects the * RegionServer with the standard ThriftServer implementation */ - HRegionThriftServer(HRegionServer regionServer, Configuration conf) { + HRegionThriftServer(HRegionServer regionServer, Configuration conf) + throws IOException { + super("Region Thrift Server"); this.rs = regionServer; - this.conf = conf; + this.serverRunner = + new ThriftServerRunner(conf, new HBaseHandlerRegion(conf)); } /** - * Inherit the Handler from the standard ThriftServer. This allows us + * Stop ThriftServer + */ + void shutdown() { + serverRunner.shutdown(); + } + + @Override + public void run() { + serverRunner.run(); + } + + /** + * Inherit the Handler from the standard ThriftServerRunner. This allows us * to use the default implementation for most calls. We override certain calls * for performance reasons */ - private class HBaseHandlerRegion extends ThriftServer.HBaseHandler { + private class HBaseHandlerRegion extends ThriftServerRunner.HBaseHandler { + + /** + * Whether requests should be redirected to other RegionServers if the + * specified region is not hosted by this RegionServer. + */ + private boolean redirect; HBaseHandlerRegion(final Configuration conf) throws IOException { super(conf); initialize(conf); } + /** + * Read and initialize config parameters + */ + private void initialize(Configuration conf) { + this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect", + false); + } + // TODO: Override more methods to short-circuit for performance /** @@ -153,91 +153,4 @@ public class HRegionThriftServer extends Thread { } } } - - /** - * Read and initialize config parameters - */ - private void initialize(Configuration conf) { - this.port = conf.getInt("hbase.regionserver.thrift.port", - DEFAULT_LISTEN_PORT); - this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress"); - this.protocol = conf.get("hbase.regionserver.thrift.protocol"); - this.transport = conf.get("hbase.regionserver.thrift.transport"); - this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking", - false); - this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect", - false); - } - - /** - * Stop ThriftServer - */ - void shutdown() { - if (tserver != null) { - tserver.stop(); - tserver = null; - } - } - - @Override - public void run() { - try { - HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf); - Hbase.Processor processor = - new Hbase.Processor(handler); - - TProtocolFactory protocolFactory; - if (this.protocol != null && this.protocol.equals("compact")) { - protocolFactory = new TCompactProtocol.Factory(); - } else { - protocolFactory = new TBinaryProtocol.Factory(); - } - - if (this.nonblocking) { - TNonblockingServerTransport serverTransport = - new TNonblockingServerSocket(this.port); - TFramedTransport.Factory transportFactory = - new TFramedTransport.Factory(); - - TNonblockingServer.Args serverArgs = - new TNonblockingServer.Args(serverTransport); - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); - LOG.info("starting HRegionServer Nonblocking Thrift server on " + - this.port); - LOG.info("HRegionServer Nonblocking Thrift server does not " + - "support address binding."); - tserver = new TNonblockingServer(serverArgs); - } else { - InetAddress listenAddress = null; - if (this.bindIpAddress != null) { - listenAddress = InetAddress.getByName(this.bindIpAddress); - } else { - listenAddress = InetAddress.getLocalHost(); - } - TServerTransport serverTransport = new TServerSocket( - new InetSocketAddress(listenAddress, port)); - - TTransportFactory transportFactory; - if (this.transport != null && this.transport.equals("framed")) { - transportFactory = new TFramedTransport.Factory(); - } else { - transportFactory = new TTransportFactory(); - } - - TBoundedThreadPoolServer.Args serverArgs = - new TBoundedThreadPoolServer.Args(serverTransport, conf); - serverArgs.processor(processor); - serverArgs.protocolFactory(protocolFactory); - serverArgs.transportFactory(transportFactory); - LOG.info("starting HRegionServer ThreadPool Thrift server on " + - listenAddress + ":" + this.port); - tserver = new TBoundedThreadPoolServer(serverArgs); - } - tserver.serve(); - } catch (Exception e) { - LOG.warn("Unable to start HRegionServerThrift interface.", e); - } - } } diff --git src/main/java/org/apache/hadoop/hbase/thrift/AbstractNonblockingServer.java src/main/java/org/apache/hadoop/hbase/thrift/AbstractNonblockingServer.java new file mode 100644 index 0000000..eec3b75 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/thrift/AbstractNonblockingServer.java @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides common methods and classes used by nonblocking TServer + * implementations. + */ +public abstract class AbstractNonblockingServer extends TServer { + protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); + + public static abstract class AbstractNonblockingServerArgs> extends AbstractServerArgs { + public long maxReadBufferBytes = Long.MAX_VALUE; + + public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { + super(transport); + transportFactory(new TFramedTransport.Factory()); + } + } + + /** + * The maximum amount of memory we will allocate to client IO buffers at a + * time. Without this limit, the server will gladly allocate client buffers + * right into an out of memory exception, rather than waiting. + */ + private final long MAX_READ_BUFFER_BYTES; + + /** + * How many bytes are currently allocated to read buffers. + */ + private final AtomicLong readBufferBytesAllocated = new AtomicLong(0); + + public AbstractNonblockingServer(AbstractNonblockingServerArgs args) { + super(args); + MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; + } + + /** + * Begin accepting connections and processing invocations. + */ + public void serve() { + // start any IO threads + if (!startThreads()) { + return; + } + + // start listening, or exit + if (!startListening()) { + return; + } + + setServing(true); + + // this will block while we serve + waitForShutdown(); + + setServing(false); + + // do a little cleanup + stopListening(); + } + + /** + * Starts any threads required for serving. + * + * @return true if everything went ok, false if threads could not be started. + */ + protected abstract boolean startThreads(); + + /** + * A method that will block until when threads handling the serving have been + * shut down. + */ + protected abstract void waitForShutdown(); + + /** + * Have the server transport start accepting connections. + * + * @return true if we started listening successfully, false if something went + * wrong. + */ + protected boolean startListening() { + try { + serverTransport_.listen(); + return true; + } catch (TTransportException ttx) { + LOGGER.error("Failed to start listening on server socket!", ttx); + return false; + } + } + + /** + * Stop listening for connections. + */ + protected void stopListening() { + serverTransport_.close(); + } + + /** + * Perform an invocation. This method could behave several different ways - + * invoke immediately inline, queue for separate execution, etc. + * + * @return true if invocation was successfully requested, which is not a + * guarantee that invocation has completed. False if the request + * failed. + */ + protected abstract boolean requestInvoke(FrameBuffer frameBuffer); + + /** + * An abstract thread that handles selecting on a set of transports and + * {@link FrameBuffer FrameBuffers} associated with selected keys + * corresponding to requests. + */ + protected abstract class AbstractSelectThread extends Thread { + protected final Selector selector; + + // List of FrameBuffers that want to change their selection interests. + protected final Set selectInterestChanges = new HashSet(); + + public AbstractSelectThread() throws IOException { + this.selector = SelectorProvider.provider().openSelector(); + } + + /** + * If the selector is blocked, wake it up. + */ + public void wakeupSelector() { + selector.wakeup(); + } + + /** + * Add FrameBuffer to the list of select interest changes and wake up the + * selector if it's blocked. When the select() call exits, it'll give the + * FrameBuffer a chance to change its interests. + */ + public void requestSelectInterestChange(FrameBuffer frameBuffer) { + synchronized (selectInterestChanges) { + selectInterestChanges.add(frameBuffer); + } + // wakeup the selector, if it's currently blocked. + selector.wakeup(); + } + + /** + * Check to see if there are any FrameBuffers that have switched their + * interest type from read to write or vice versa. + */ + protected void processInterestChanges() { + synchronized (selectInterestChanges) { + for (FrameBuffer fb : selectInterestChanges) { + fb.changeSelectInterests(); + } + selectInterestChanges.clear(); + } + } + + /** + * Do the work required to read from a readable client. If the frame is + * fully read, then invoke the method call. + */ + protected void handleRead(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (!buffer.read()) { + cleanupSelectionKey(key); + return; + } + + // if the buffer's frame read is complete, invoke the method. + if (buffer.isFrameFullyRead()) { + if (!requestInvoke(buffer)) { + cleanupSelectionKey(key); + } + } + } + + /** + * Let a writable client get written, if there's data to be written. + */ + protected void handleWrite(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (!buffer.write()) { + cleanupSelectionKey(key); + } + } + + /** + * Do connection-close cleanup on a given SelectionKey. + */ + protected void cleanupSelectionKey(SelectionKey key) { + // remove the records from the two maps + FrameBuffer buffer = (FrameBuffer) key.attachment(); + if (buffer != null) { + // close the buffer + buffer.close(); + } + // cancel the selection key + key.cancel(); + } + } // SelectThread + + /** + * Possible states for the FrameBuffer state machine. + */ + private enum FrameBufferState { + // in the midst of reading the frame size off the wire + READING_FRAME_SIZE, + // reading the actual frame data now, but not all the way done yet + READING_FRAME, + // completely read the frame, so an invocation can now happen + READ_FRAME_COMPLETE, + // waiting to get switched to listening for write events + AWAITING_REGISTER_WRITE, + // started writing response data, not fully complete yet + WRITING, + // another thread wants this framebuffer to go back to reading + AWAITING_REGISTER_READ, + // we want our transport and selection key invalidated in the selector + // thread + AWAITING_CLOSE + } + + /** + * Class that implements a sort of state machine around the interaction with a + * client and an invoker. It manages reading the frame size and frame data, + * getting it handed off as wrapped transports, and then the writing of + * response data back to the client. In the process it manages flipping the + * read and write bits on the selection key for its client. + */ + protected class FrameBuffer { + // the actual transport hooked up to the client. + public final TNonblockingTransport trans_; + + // the SelectionKey that corresponds to our transport + private final SelectionKey selectionKey_; + + // the SelectThread that owns the registration of our transport + private final AbstractSelectThread selectThread_; + + // where in the process of reading/writing are we? + private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE; + + // the ByteBuffer we'll be using to write and read, depending on the state + private ByteBuffer buffer_; + + private TByteArrayOutputStream response_; + + public FrameBuffer(final TNonblockingTransport trans, + final SelectionKey selectionKey, + final AbstractSelectThread selectThread) { + trans_ = trans; + selectionKey_ = selectionKey; + selectThread_ = selectThread; + buffer_ = ByteBuffer.allocate(4); + } + + /** + * Give this FrameBuffer a chance to read. The selector loop should have + * received a read event for this FrameBuffer. + * + * @return true if the connection should live on, false if it should be + * closed + */ + public boolean read() { + if (state_ == FrameBufferState.READING_FRAME_SIZE) { + // try to read the frame size completely + if (!internalRead()) { + return false; + } + + // if the frame size has been read completely, then prepare to read the + // actual frame. + if (buffer_.remaining() == 0) { + // pull out the frame size as an integer. + int frameSize = buffer_.getInt(0); + if (frameSize <= 0) { + LOGGER.error("Read an invalid frame size of " + frameSize + + ". Are you using TFramedTransport on the client side?"); + return false; + } + + // if this frame will always be too large for this server, log the + // error and close the connection. + if (frameSize > MAX_READ_BUFFER_BYTES) { + LOGGER.error("Read a frame size of " + frameSize + + ", which is bigger than the maximum allowable buffer size for ALL connections."); + return false; + } + + // if this frame will push us over the memory limit, then return. + // with luck, more memory will free up the next time around. + if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { + return true; + } + + // increment the amount of memory allocated to read buffers + readBufferBytesAllocated.addAndGet(frameSize); + + // reallocate the readbuffer as a frame-sized buffer + buffer_ = ByteBuffer.allocate(frameSize); + + state_ = FrameBufferState.READING_FRAME; + } else { + // this skips the check of READING_FRAME state below, since we can't + // possibly go on to that state if there's data left to be read at + // this one. + return true; + } + } + + // it is possible to fall through from the READING_FRAME_SIZE section + // to READING_FRAME if there's already some frame data available once + // READING_FRAME_SIZE is complete. + + if (state_ == FrameBufferState.READING_FRAME) { + if (!internalRead()) { + return false; + } + + // since we're already in the select loop here for sure, we can just + // modify our selection key directly. + if (buffer_.remaining() == 0) { + // get rid of the read select interests + selectionKey_.interestOps(0); + state_ = FrameBufferState.READ_FRAME_COMPLETE; + } + + return true; + } + + // if we fall through to this point, then the state must be invalid. + LOGGER.error("Read was called but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to write its output to the final client. + */ + public boolean write() { + if (state_ == FrameBufferState.WRITING) { + try { + if (trans_.write(buffer_) < 0) { + return false; + } + } catch (IOException e) { + LOGGER.warn("Got an IOException during write!", e); + return false; + } + + // we're done writing. now we need to switch back to reading. + if (buffer_.remaining() == 0) { + prepareRead(); + } + return true; + } + + LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to set its interest to write, once data + * has come in. + */ + public void changeSelectInterests() { + if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) { + // set the OP_WRITE interest + selectionKey_.interestOps(SelectionKey.OP_WRITE); + state_ = FrameBufferState.WRITING; + } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) { + prepareRead(); + } else if (state_ == FrameBufferState.AWAITING_CLOSE) { + close(); + selectionKey_.cancel(); + } else { + LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")"); + } + } + + /** + * Shut the connection down. + */ + public void close() { + // if we're being closed due to an error, we might have allocated a + // buffer that we need to subtract for our memory accounting. + if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) { + readBufferBytesAllocated.addAndGet(-buffer_.array().length); + } + trans_.close(); + } + + /** + * Check if this FrameBuffer has a full frame read. + */ + public boolean isFrameFullyRead() { + return state_ == FrameBufferState.READ_FRAME_COMPLETE; + } + + /** + * After the processor has processed the invocation, whatever thread is + * managing invocations should call this method on this FrameBuffer so we + * know it's time to start trying to write again. Also, if it turns out that + * there actually isn't any data in the response buffer, we'll skip trying + * to write and instead go back to reading. + */ + public void responseReady() { + // the read buffer is definitely no longer in use, so we will decrement + // our read buffer count. we do this here as well as in close because + // we'd like to free this read memory up as quickly as possible for other + // clients. + readBufferBytesAllocated.addAndGet(-buffer_.array().length); + + if (response_.len() == 0) { + // go straight to reading again. this was probably an oneway method + state_ = FrameBufferState.AWAITING_REGISTER_READ; + buffer_ = null; + } else { + buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); + + // set state that we're waiting to be switched to write. we do this + // asynchronously through requestSelectInterestChange() because there is + // a possibility that we're not in the main thread, and thus currently + // blocked in select(). (this functionality is in place for the sake of + // the HsHa server.) + state_ = FrameBufferState.AWAITING_REGISTER_WRITE; + } + requestSelectInterestChange(); + } + + /** + * Actually invoke the method signified by this FrameBuffer. + */ + public void invoke() { + TTransport inTrans = getInputTransport(); + TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); + TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); + + try { + processorFactory_.getProcessor(inTrans).process(inProt, outProt); + responseReady(); + return; + } catch (TException te) { + LOGGER.warn("Exception while invoking!", te); + } catch (Throwable t) { + LOGGER.error("Unexpected throwable while invoking!", t); + } + // This will only be reached when there is a throwable. + state_ = FrameBufferState.AWAITING_CLOSE; + requestSelectInterestChange(); + } + + /** + * Wrap the read buffer in a memory-based transport so a processor can read + * the data it needs to handle an invocation. + */ + private TTransport getInputTransport() { + return new TMemoryInputTransport(buffer_.array()); + } + + /** + * Get the transport that should be used by the invoker for responding. + */ + private TTransport getOutputTransport() { + response_ = new TByteArrayOutputStream(); + return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); + } + + /** + * Perform a read into buffer. + * + * @return true if the read succeeded, false if there was an error or the + * connection closed. + */ + private boolean internalRead() { + try { + if (trans_.read(buffer_) < 0) { + return false; + } + return true; + } catch (IOException e) { + LOGGER.warn("Got an IOException in internalRead!", e); + return false; + } + } + + /** + * We're done writing, so reset our interest ops and change state + * accordingly. + */ + private void prepareRead() { + // we can set our interest directly without using the queue because + // we're in the select thread. + selectionKey_.interestOps(SelectionKey.OP_READ); + // get ready for another go-around + buffer_ = ByteBuffer.allocate(4); + state_ = FrameBufferState.READING_FRAME_SIZE; + } + + /** + * When this FrameBuffer needs to change its select interests and execution + * might not be in its select thread, then this method will make sure the + * interest change gets done when the select thread wakes back up. When the + * current thread is this FrameBuffer's select thread, then it just does the + * interest change immediately. + */ + private void requestSelectInterestChange() { + if (Thread.currentThread() == this.selectThread_) { + changeSelectInterests(); + } else { + this.selectThread_.requestSelectInterestChange(this); + } + } + } // FrameBuffer +} diff --git src/main/java/org/apache/hadoop/hbase/thrift/Invocation.java src/main/java/org/apache/hadoop/hbase/thrift/Invocation.java new file mode 100644 index 0000000..f38ae4f --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/thrift/Invocation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import org.apache.hadoop.hbase.thrift.AbstractNonblockingServer.FrameBuffer; + +/** + * An Invocation represents a method call that is prepared to execute, given + * an idle worker thread. It contains the input and output protocols the + * thread's processor should use to perform the usual Thrift invocation. + */ +class Invocation implements Runnable { + private final FrameBuffer frameBuffer; + + public Invocation(final FrameBuffer frameBuffer) { + this.frameBuffer = frameBuffer; + } + + public void run() { + frameBuffer.invoke(); + } +} diff --git src/main/java/org/apache/hadoop/hbase/thrift/TThreadedSelectorServer.java src/main/java/org/apache/hadoop/hbase/thrift/TThreadedSelectorServer.java new file mode 100644 index 0000000..4d6d103 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/thrift/TThreadedSelectorServer.java @@ -0,0 +1,691 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import java.io.IOException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A Half-Sync/Half-Async server with a separate pool of threads to handle + * non-blocking I/O. Accepts are handled on a single thread, and a configurable + * number of nonblocking selector threads manage reading and writing of client + * connections. A synchronous worker thread pool handles processing of requests. + * + * Performs better than TNonblockingServer/THsHaServer in multi-core + * environments when the the bottleneck is CPU on the single selector thread + * handling I/O. In addition, because the accept handling is decoupled from + * reads/writes and invocation, the server has better ability to handle back- + * pressure from new connections (e.g. stop accepting when busy). + * + * Like TNonblockingServer, it relies on the use of TFramedTransport. + */ +public class TThreadedSelectorServer extends AbstractNonblockingServer { + private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName()); + + /** + * Number of selector threads for reading and writing socket + */ + public static final String SELECTOR_THREADS_CONF_KEY = + "hbase.thrift.selector.threads"; + + /** + * Number fo threads for processing the thrift calls + */ + public static final String WORKER_THREADS_CONF_KEY = + "hbase.thrift.worker.threads"; + + /** + * Time to wait for server to stop gracefully + */ + public static final String STOP_TIMEOUT_CONF_KEY = + "hbase.thrift.stop.timeout.seconds"; + + /** + * Maximum number of accepted elements per selector + */ + public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY = + "hbase.thrift.accept.queue.size.per.selector"; + + /** + * The strategy for handling new accepted connections. + */ + public static final String ACCEPT_POLICY_CONF_KEY = + "hbase.thrift.accept.policy"; + + public static class Args extends AbstractNonblockingServerArgs { + + /** The number of threads for selecting on already-accepted connections */ + public int selectorThreads = 2; + /** + * The size of the executor service (if none is specified) that will handle + * invocations. This may be set to 0, in which case invocations will be + * handled directly on the selector threads (as is in TNonblockingServer) + */ + private int workerThreads = 5; + /** Time to wait for server to stop gracefully */ + private int stopTimeoutVal = 60; + private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + /** The ExecutorService for handling dispatched requests */ + private ExecutorService executorService = null; + /** + * The size of the blocking queue per selector thread for passing accepted + * connections to the selector thread + */ + private int acceptQueueSizePerThread = 4; + + /** + * Determines the strategy for handling new accepted connections. + */ + public static enum AcceptPolicy { + /** + * Require accepted connection registration to be handled by the executor. + * If the worker pool is saturated, further accepts will be closed + * immediately. Slightly increases latency due to an extra scheduling. + */ + FAIR_ACCEPT, + /** + * Handle the accepts as fast as possible, disregarding the status of the + * executor service. + */ + FAST_ACCEPT + } + + private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT; + + public Args(TNonblockingServerTransport transport, Configuration conf) { + super(transport); + readConf(conf); + } + + private void readConf(Configuration conf) { + selectorThreads = conf.getInt(SELECTOR_THREADS_CONF_KEY, selectorThreads); + workerThreads = conf.getInt(WORKER_THREADS_CONF_KEY, workerThreads); + stopTimeoutVal = conf.getInt(STOP_TIMEOUT_CONF_KEY, stopTimeoutVal); + acceptQueueSizePerThread = conf.getInt( + ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, acceptQueueSizePerThread); + acceptPolicy = AcceptPolicy.valueOf(conf.get( + ACCEPT_POLICY_CONF_KEY, acceptPolicy.toString()).toUpperCase()); + } + + public Args selectorThreads(int i) { + selectorThreads = i; + return this; + } + + public int getSelectorThreads() { + return selectorThreads; + } + + public Args workerThreads(int i) { + workerThreads = i; + return this; + } + + public int getWorkerThreads() { + return workerThreads; + } + + public int getStopTimeoutVal() { + return stopTimeoutVal; + } + + public Args stopTimeoutVal(int stopTimeoutVal) { + this.stopTimeoutVal = stopTimeoutVal; + return this; + } + + public TimeUnit getStopTimeoutUnit() { + return stopTimeoutUnit; + } + + public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { + this.stopTimeoutUnit = stopTimeoutUnit; + return this; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public Args executorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public int getAcceptQueueSizePerThread() { + return acceptQueueSizePerThread; + } + + public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) { + this.acceptQueueSizePerThread = acceptQueueSizePerThread; + return this; + } + + public AcceptPolicy getAcceptPolicy() { + return acceptPolicy; + } + + public Args acceptPolicy(AcceptPolicy acceptPolicy) { + this.acceptPolicy = acceptPolicy; + return this; + } + + public void validate() { + if (selectorThreads <= 0) { + throw new IllegalArgumentException("selectorThreads must be positive."); + } + if (workerThreads < 0) { + throw new IllegalArgumentException("workerThreads must be non-negative."); + } + if (acceptQueueSizePerThread <= 0) { + throw new IllegalArgumentException("acceptQueueSizePerThread must be positive."); + } + } + } + + // Flag for stopping the server + private volatile boolean stopped_ = true; + + // The thread handling all accepts + private AcceptThread acceptThread; + + // Threads handling events on client transports + private final Set selectorThreads = new HashSet(); + + // This wraps all the functionality of queueing and thread pool management + // for the passing of Invocations from the selector thread(s) to the workers + // (if any). + private final ExecutorService invoker; + + private final Args args; + + /** + * Create the server with the specified Args configuration + */ + public TThreadedSelectorServer(Args args) { + super(args); + args.validate(); + invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService; + this.args = args; + } + + /** + * Start the accept and selector threads running to deal with clients. + * + * @return true if everything went ok, false if we couldn't start for some + * reason. + */ + @Override + protected boolean startThreads() { + try { + for (int i = 0; i < args.selectorThreads; ++i) { + selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread)); + } + acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_, + createSelectorThreadLoadBalancer(selectorThreads)); + stopped_ = false; + for (SelectorThread thread : selectorThreads) { + thread.start(); + } + acceptThread.start(); + return true; + } catch (IOException e) { + LOGGER.error("Failed to start threads!", e); + return false; + } + } + + /** + * Joins the accept and selector threads and shuts down the executor service. + */ + @Override + protected void waitForShutdown() { + try { + joinThreads(); + } catch (InterruptedException e) { + // Non-graceful shutdown occurred + LOGGER.error("Interrupted while joining threads!", e); + } + gracefullyShutdownInvokerPool(); + } + + protected void joinThreads() throws InterruptedException { + // wait until the io threads exit + acceptThread.join(); + for (SelectorThread thread : selectorThreads) { + thread.join(); + } + } + + /** + * Stop serving and shut everything down. + */ + @Override + public void stop() { + stopped_ = true; + + // Stop queuing connect attempts asap + stopListening(); + + if (acceptThread != null) { + acceptThread.wakeupSelector(); + } + if (selectorThreads != null) { + for (SelectorThread thread : selectorThreads) { + if (thread != null) + thread.wakeupSelector(); + } + } + } + + protected void gracefullyShutdownInvokerPool() { + // try to gracefully shut down the executor service + invoker.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + /** + * We override the standard invoke method here to queue the invocation for + * invoker service instead of immediately invoking. If there is no thread + * pool, handle the invocation inline on this thread + */ + @Override + protected boolean requestInvoke(FrameBuffer frameBuffer) { + Runnable invocation = getRunnable(frameBuffer); + if (invoker != null) { + try { + invoker.execute(invocation); + return true; + } catch (RejectedExecutionException rx) { + LOGGER.warn("ExecutorService rejected execution!", rx); + return false; + } + } else { + // Invoke on the caller's thread + invocation.run(); + return true; + } + } + + protected Runnable getRunnable(FrameBuffer frameBuffer) { + return new Invocation(frameBuffer); + } + + /** + * Helper to create the invoker if one is not specified + */ + protected static ExecutorService createDefaultExecutor(Args options) { + return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null; + } + + private static BlockingQueue createDefaultAcceptQueue(int queueSize) { + if (queueSize == 0) { + // Unbounded queue + return new LinkedBlockingQueue(); + } + return new ArrayBlockingQueue(queueSize); + } + + /** + * The thread that selects on the server transport (listen socket) and accepts + * new connections to hand off to the IO selector threads + */ + protected class AcceptThread extends Thread { + + // The listen socket to accept on + private final TNonblockingServerTransport serverTransport; + private final Selector acceptSelector; + + private final SelectorThreadLoadBalancer threadChooser; + + /** + * Set up the AcceptThead + * + * @throws IOException + */ + public AcceptThread(TNonblockingServerTransport serverTransport, + SelectorThreadLoadBalancer threadChooser) throws IOException { + this.serverTransport = serverTransport; + this.threadChooser = threadChooser; + this.acceptSelector = SelectorProvider.provider().openSelector(); + this.serverTransport.registerSelector(acceptSelector); + } + + /** + * The work loop. Selects on the server transport and accepts. If there was + * a server transport that had blocking accepts, and returned on blocking + * client transports, that should be used instead + */ + public void run() { + try { + while (!stopped_) { + select(); + } + } catch (Throwable t) { + LOGGER.error("run() exiting due to uncaught error", t); + } finally { + // This will wake up the selector threads + TThreadedSelectorServer.this.stop(); + } + } + + /** + * If the selector is blocked, wake it up. + */ + public void wakeupSelector() { + acceptSelector.wakeup(); + } + + /** + * Select and process IO events appropriately: If there are connections to + * be accepted, accept them. + */ + private void select() { + try { + // wait for connect events. + acceptSelector.select(); + + // process the io events we received + Iterator selectedKeys = acceptSelector.selectedKeys().iterator(); + while (!stopped_ && selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + // skip if not valid + if (!key.isValid()) { + continue; + } + + if (key.isAcceptable()) { + handleAccept(); + } else { + LOGGER.warn("Unexpected state in select! " + key.interestOps()); + } + } + } catch (IOException e) { + LOGGER.warn("Got an IOException while selecting!", e); + } + } + + /** + * Accept a new connection. + */ + private void handleAccept() { + final TNonblockingTransport client = doAccept(); + if (client != null) { + // Pass this connection to a selector thread + final SelectorThread targetThread = threadChooser.nextThread(); + + if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) { + doAddAccept(targetThread, client); + } else { + // FAIR_ACCEPT + try { + invoker.submit(new Runnable() { + public void run() { + doAddAccept(targetThread, client); + } + }); + } catch (RejectedExecutionException rx) { + LOGGER.warn("ExecutorService rejected accept registration!", rx); + // close immediately + client.close(); + } + } + } + } + + private TNonblockingTransport doAccept() { + try { + return (TNonblockingTransport) serverTransport.accept(); + } catch (TTransportException tte) { + // something went wrong accepting. + LOGGER.warn("Exception trying to accept!", tte); + return null; + } + } + + private void doAddAccept(SelectorThread thread, TNonblockingTransport client) { + if (!thread.addAcceptedConnection(client)) { + client.close(); + } + } + } // AcceptThread + + /** + * The SelectorThread(s) will be doing all the selecting on accepted active + * connections. + */ + protected class SelectorThread extends AbstractSelectThread { + + // Accepted connections added by the accept thread. + private final BlockingQueue acceptedQueue; + + /** + * Set up the SelectorThread with an unbounded queue for incoming accepts. + * + * @throws IOException + * if a selector cannot be created + */ + public SelectorThread() throws IOException { + this(new LinkedBlockingQueue()); + } + + /** + * Set up the SelectorThread with an bounded queue for incoming accepts. + * + * @throws IOException + * if a selector cannot be created + */ + public SelectorThread(int maxPendingAccepts) throws IOException { + this(createDefaultAcceptQueue(maxPendingAccepts)); + } + + /** + * Set up the SelectorThread with a specified queue for connections. + * + * @param acceptedQueue + * The BlockingQueue implementation for holding incoming accepted + * connections. + * @throws IOException + * if a selector cannot be created. + */ + public SelectorThread(BlockingQueue acceptedQueue) throws IOException { + this.acceptedQueue = acceptedQueue; + } + + /** + * Hands off an accepted connection to be handled by this thread. This + * method will block if the queue for new connections is at capacity. + * + * @param accepted + * The connection that has been accepted. + * @return true if the connection has been successfully added. + */ + public boolean addAcceptedConnection(TNonblockingTransport accepted) { + try { + acceptedQueue.put(accepted); + } catch (InterruptedException e) { + LOGGER.warn("Interrupted while adding accepted connection!", e); + return false; + } + selector.wakeup(); + return true; + } + + /** + * The work loop. Handles selecting (read/write IO), dispatching, and + * managing the selection preferences of all existing connections. + */ + public void run() { + try { + while (!stopped_) { + select(); + processAcceptedConnections(); + processInterestChanges(); + } + for (SelectionKey selectionKey : selector.keys()) { + cleanupSelectionKey(selectionKey); + } + } catch (Throwable t) { + LOGGER.error("run() exiting due to uncaught error", t); + } finally { + // This will wake up the accept thread and the other selector threads + TThreadedSelectorServer.this.stop(); + } + } + + /** + * Select and process IO events appropriately: If there are existing + * connections with data waiting to be read, read it, buffering until a + * whole frame has been read. If there are any pending responses, buffer + * them until their target client is available, and then send the data. + */ + private void select() { + try { + // wait for io events. + selector.select(); + + // process the io events we received + Iterator selectedKeys = selector.selectedKeys().iterator(); + while (!stopped_ && selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + // skip if not valid + if (!key.isValid()) { + cleanupSelectionKey(key); + continue; + } + + if (key.isReadable()) { + // deal with reads + handleRead(key); + } else if (key.isWritable()) { + // deal with writes + handleWrite(key); + } else { + LOGGER.warn("Unexpected state in select! " + key.interestOps()); + } + } + } catch (IOException e) { + LOGGER.warn("Got an IOException while selecting!", e); + } + } + + private void processAcceptedConnections() { + // Register accepted connections + while (!stopped_) { + TNonblockingTransport accepted = acceptedQueue.poll(); + if (accepted == null) { + break; + } + registerAccepted(accepted); + } + } + + private void registerAccepted(TNonblockingTransport accepted) { + SelectionKey clientKey = null; + try { + clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ); + + FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this); + clientKey.attach(frameBuffer); + } catch (IOException e) { + LOGGER.warn("Failed to register accepted connection to selector!", e); + if (clientKey != null) { + cleanupSelectionKey(clientKey); + } + accepted.close(); + } + } + } // SelectorThread + + /** + * Creates a SelectorThreadLoadBalancer to be used by the accept thread for + * assigning newly accepted connections across the threads. + */ + protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection threads) { + return new SelectorThreadLoadBalancer(threads); + } + + /** + * A round robin load balancer for choosing selector threads for new + * connections. + */ + protected class SelectorThreadLoadBalancer { + private final Collection threads; + private Iterator nextThreadIterator; + + public SelectorThreadLoadBalancer(Collection threads) { + if (threads.isEmpty()) { + throw new IllegalArgumentException("At least one selector thread is required"); + } + this.threads = Collections.unmodifiableList(new ArrayList(threads)); + nextThreadIterator = this.threads.iterator(); + } + + public SelectorThread nextThread() { + // Choose a selector thread (round robin) + if (!nextThreadIterator.hasNext()) { + nextThreadIterator = threads.iterator(); + } + return nextThreadIterator.next(); + } + } +} diff --git src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java index 3fa5d41..0ed6061 100644 --- src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java +++ src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java @@ -18,89 +18,29 @@ package org.apache.hadoop.hbase.thrift; -import static org.apache.hadoop.hbase.util.Bytes.getBytes; - -import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.TreeMap; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.ParseFilter; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Strings; -import org.apache.hadoop.net.DNS; -import org.apache.hadoop.hbase.filter.PrefixFilter; -import org.apache.hadoop.hbase.filter.WhileMatchFilter; -import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; -import org.apache.hadoop.hbase.thrift.generated.BatchMutation; -import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; -import org.apache.hadoop.hbase.thrift.generated.Hbase; -import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface; -import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor; -import org.apache.hadoop.hbase.thrift.generated.IOError; -import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; -import org.apache.hadoop.hbase.thrift.generated.Mutation; -import org.apache.hadoop.hbase.thrift.generated.TCell; -import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; -import org.apache.hadoop.hbase.thrift.generated.TRowResult; -import org.apache.hadoop.hbase.thrift.generated.TScan; -import org.apache.hadoop.hbase.util.Addressing; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.util.Shell.ExitCodeException; -import org.apache.thrift.TException; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.server.THsHaServer; -import org.apache.thrift.server.TNonblockingServer; -import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TServer.AbstractServerArgs; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.thrift.transport.TNonblockingServerTransport; -import org.apache.thrift.transport.TServerSocket; -import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransportFactory; - -import com.google.common.base.Joiner; /** - * ThriftServer - this class starts up a Thrift server which implements the - * Hbase API specified in the Hbase.thrift IDL file. + * ThriftServer- this class starts up a Thrift server which implements the + * Hbase API specified in the Hbase.thrift IDL file. The server runs in an + * independent process. */ public class ThriftServer { @@ -119,968 +59,16 @@ public class ThriftServer { private static final int DEFAULT_LISTEN_PORT = 9090; private Configuration conf; - TServer server; - - /** An enum of server implementation selections */ - enum ImplType { - HS_HA("hsha", true, THsHaServer.class, false), - NONBLOCKING("nonblocking", true, TNonblockingServer.class, false), - THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true); - - public static final ImplType DEFAULT = THREAD_POOL; - - final String option; - final boolean isAlwaysFramed; - final Class serverClass; - final boolean canSpecifyBindIP; - - ImplType(String option, boolean isAlwaysFramed, - Class serverClass, boolean canSpecifyBindIP) { - this.option = option; - this.isAlwaysFramed = isAlwaysFramed; - this.serverClass = serverClass; - this.canSpecifyBindIP = canSpecifyBindIP; - } - - /** - * @return -option so we can get the list of options from - * {@link #values()} - */ - @Override - public String toString() { - return "-" + option; - } - - String getDescription() { - StringBuilder sb = new StringBuilder("Use the " + - serverClass.getSimpleName()); - if (isAlwaysFramed) { - sb.append(" This implies the framed transport."); - } - if (this == DEFAULT) { - sb.append("This is the default."); - } - return sb.toString(); - } - - static OptionGroup createOptionGroup() { - OptionGroup group = new OptionGroup(); - for (ImplType t : values()) { - group.addOption(new Option(t.option, t.getDescription())); - } - return group; - } - - static ImplType getServerImpl(CommandLine cmd) { - ImplType chosenType = null; - int numChosen = 0; - for (ImplType t : values()) { - if (cmd.hasOption(t.option)) { - chosenType = t; - ++numChosen; - } - } - if (numChosen != 1) { - throw new AssertionError("Exactly one option out of " + - Arrays.toString(values()) + " has to be specified"); - } - return chosenType; - } - - public String simpleClassName() { - return serverClass.getSimpleName(); - } - - public static List serversThatCannotSpecifyBindIP() { - List l = new ArrayList(); - for (ImplType t : values()) { - if (!t.canSpecifyBindIP) { - l.add(t.simpleClassName()); - } - } - return l; - } - - } - - /** - * The HBaseHandler is a glue object that connects Thrift RPC calls to the - * HBase client API primarily defined in the HBaseAdmin and HTable objects. - */ - public static class HBaseHandler implements Hbase.Iface { - protected Configuration conf; - protected HBaseAdmin admin = null; - protected final Log LOG = LogFactory.getLog(this.getClass().getName()); - - // nextScannerId and scannerMap are used to manage scanner state - protected int nextScannerId = 0; - protected HashMap scannerMap = null; - - private static ThreadLocal> threadLocalTables = new ThreadLocal>() { - @Override - protected Map initialValue() { - return new TreeMap(); - } - }; - - /** - * Returns a list of all the column families for a given htable. - * - * @param table - * @return - * @throws IOException - */ - byte[][] getAllColumns(HTable table) throws IOException { - HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); - byte[][] columns = new byte[cds.length][]; - for (int i = 0; i < cds.length; i++) { - columns[i] = Bytes.add(cds[i].getName(), - KeyValue.COLUMN_FAMILY_DELIM_ARRAY); - } - return columns; - } - - /** - * Creates and returns an HTable instance from a given table name. - * - * @param tableName - * name of table - * @return HTable object - * @throws IOException - * @throws IOError - */ - protected HTable getTable(final byte[] tableName) throws - IOException { - String table = new String(tableName); - Map tables = threadLocalTables.get(); - if (!tables.containsKey(table)) { - tables.put(table, new HTable(conf, tableName)); - } - return tables.get(table); - } - - protected HTable getTable(final ByteBuffer tableName) throws IOException { - return getTable(getBytes(tableName)); - } - - /** - * Assigns a unique ID to the scanner and adds the mapping to an internal - * hash-map. - * - * @param scanner - * @return integer scanner id - */ - protected synchronized int addScanner(ResultScanner scanner) { - int id = nextScannerId++; - scannerMap.put(id, scanner); - return id; - } - - /** - * Returns the scanner associated with the specified ID. - * - * @param id - * @return a Scanner, or null if ID was invalid. - */ - protected synchronized ResultScanner getScanner(int id) { - return scannerMap.get(id); - } - - /** - * Removes the scanner associated with the specified ID from the internal - * id->scanner hash-map. - * - * @param id - * @return a Scanner, or null if ID was invalid. - */ - protected synchronized ResultScanner removeScanner(int id) { - return scannerMap.remove(id); - } - - /** - * Constructs an HBaseHandler object. - * @throws IOException - */ - protected HBaseHandler() - throws IOException { - this(HBaseConfiguration.create()); - } - - protected HBaseHandler(final Configuration c) - throws IOException { - this.conf = c; - admin = new HBaseAdmin(conf); - scannerMap = new HashMap(); - } - - @Override - public void enableTable(ByteBuffer tableName) throws IOError { - try{ - admin.enableTable(getBytes(tableName)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void disableTable(ByteBuffer tableName) throws IOError{ - try{ - admin.disableTable(getBytes(tableName)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public boolean isTableEnabled(ByteBuffer tableName) throws IOError { - try { - return HTable.isTableEnabled(this.conf, getBytes(tableName)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void compact(ByteBuffer tableNameOrRegionName) throws IOError { - try{ - admin.compact(getBytes(tableNameOrRegionName)); - } catch (InterruptedException e) { - throw new IOError(e.getMessage()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { - try{ - admin.majorCompact(getBytes(tableNameOrRegionName)); - } catch (InterruptedException e) { - throw new IOError(e.getMessage()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getTableNames() throws IOError { - try { - HTableDescriptor[] tables = this.admin.listTables(); - ArrayList list = new ArrayList(tables.length); - for (int i = 0; i < tables.length; i++) { - list.add(ByteBuffer.wrap(tables[i].getName())); - } - return list; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getTableRegions(ByteBuffer tableName) - throws IOError { - try{ - List hris = this.admin.getTableRegions(tableName.array()); - List regions = new ArrayList(); - - if (hris != null) { - for (HRegionInfo regionInfo : hris){ - TRegionInfo region = new TRegionInfo(); - region.startKey = ByteBuffer.wrap(regionInfo.getStartKey()); - region.endKey = ByteBuffer.wrap(regionInfo.getEndKey()); - region.id = regionInfo.getRegionId(); - region.name = ByteBuffer.wrap(regionInfo.getRegionName()); - region.version = regionInfo.getVersion(); - regions.add(region); - } - } - return regions; - } catch (IOException e){ - throw new IOError(e.getMessage()); - } - } - - @Deprecated - @Override - public List get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) - throws IOError { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return get(tableName, row, famAndQf[0], new byte[0]); - } - return get(tableName, row, famAndQf[0], famAndQf[1]); - } - - protected List get(ByteBuffer tableName, - ByteBuffer row, - byte[] family, - byte[] qualifier) throws IOError { - try { - HTable table = getTable(tableName); - Get get = new Get(getBytes(row)); - if (qualifier == null || qualifier.length == 0) { - get.addFamily(family); - } else { - get.addColumn(family, qualifier); - } - Result result = table.get(get); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Deprecated - @Override - public List getVer(ByteBuffer tableName, ByteBuffer row, - ByteBuffer column, int numVersions) throws IOError { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return getVer(tableName, row, famAndQf[0], - new byte[0], numVersions); - } - return getVer(tableName, row, - famAndQf[0], famAndQf[1], numVersions); - } - - public List getVer(ByteBuffer tableName, ByteBuffer row, - byte[] family, - byte[] qualifier, int numVersions) throws IOError { - try { - HTable table = getTable(tableName); - Get get = new Get(getBytes(row)); - get.addColumn(family, qualifier); - get.setMaxVersions(numVersions); - Result result = table.get(get); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Deprecated - @Override - public List getVerTs(ByteBuffer tableName, - ByteBuffer row, - ByteBuffer column, - long timestamp, - int numVersions) throws IOError { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp, - numVersions); - } - return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, - numVersions); - } - - protected List getVerTs(ByteBuffer tableName, - ByteBuffer row, byte [] family, - byte [] qualifier, long timestamp, int numVersions) throws IOError { - try { - HTable table = getTable(tableName); - Get get = new Get(getBytes(row)); - get.addColumn(family, qualifier); - get.setTimeRange(Long.MIN_VALUE, timestamp); - get.setMaxVersions(numVersions); - Result result = table.get(get); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getRow(ByteBuffer tableName, ByteBuffer row) - throws IOError { - return getRowWithColumnsTs(tableName, row, null, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowWithColumns(ByteBuffer tableName, - ByteBuffer row, - List columns) throws IOError { - return getRowWithColumnsTs(tableName, row, columns, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowTs(ByteBuffer tableName, ByteBuffer row, - long timestamp) throws IOError { - return getRowWithColumnsTs(tableName, row, null, - timestamp); - } - - @Override - public List getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, - List columns, long timestamp) throws IOError { - try { - HTable table = getTable(tableName); - if (columns == null) { - Get get = new Get(getBytes(row)); - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); - return ThriftUtilities.rowResultFromHBase(result); - } - Get get = new Get(getBytes(row)); - for(ByteBuffer column : columns) { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if (famAndQf.length == 1) { - get.addFamily(famAndQf[0]); - } else { - get.addColumn(famAndQf[0], famAndQf[1]); - } - } - get.setTimeRange(Long.MIN_VALUE, timestamp); - Result result = table.get(get); - return ThriftUtilities.rowResultFromHBase(result); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getRows(ByteBuffer tableName, - List rows) - throws IOError { - return getRowsWithColumnsTs(tableName, rows, null, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowsWithColumns(ByteBuffer tableName, - List rows, - List columns) throws IOError { - return getRowsWithColumnsTs(tableName, rows, columns, - HConstants.LATEST_TIMESTAMP); - } - - @Override - public List getRowsTs(ByteBuffer tableName, - List rows, - long timestamp) throws IOError { - return getRowsWithColumnsTs(tableName, rows, null, - timestamp); - } - - @Override - public List getRowsWithColumnsTs(ByteBuffer tableName, - List rows, - List columns, long timestamp) throws IOError { - try { - List gets = new ArrayList(rows.size()); - HTable table = getTable(tableName); - for (ByteBuffer row : rows) { - Get get = new Get(getBytes(row)); - if (columns != null) { - - for(ByteBuffer column : columns) { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if (famAndQf.length == 1) { - get.addFamily(famAndQf[0]); - } else { - get.addColumn(famAndQf[0], famAndQf[1]); - } - } - get.setTimeRange(Long.MIN_VALUE, timestamp); - } - gets.add(get); - } - Result[] result = table.get(gets); - return ThriftUtilities.rowResultFromHBase(result); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) - throws IOError { - deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void deleteAllTs(ByteBuffer tableName, - ByteBuffer row, - ByteBuffer column, - long timestamp) throws IOError { - try { - HTable table = getTable(tableName); - Delete delete = new Delete(getBytes(row)); - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if (famAndQf.length == 1) { - delete.deleteFamily(famAndQf[0], timestamp); - } else { - delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); - } - table.delete(delete); + ThriftServerRunner serverRunner; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void deleteAllRow(ByteBuffer tableName, ByteBuffer row) throws IOError { - deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp) - throws IOError { - try { - HTable table = getTable(tableName); - Delete delete = new Delete(getBytes(row), timestamp, null); - table.delete(delete); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void createTable(ByteBuffer in_tableName, - List columnFamilies) throws IOError, - IllegalArgument, AlreadyExists { - byte [] tableName = getBytes(in_tableName); - try { - if (admin.tableExists(tableName)) { - throw new AlreadyExists("table name already in use"); - } - HTableDescriptor desc = new HTableDescriptor(tableName); - for (ColumnDescriptor col : columnFamilies) { - HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); - desc.addFamily(colDesc); - } - admin.createTable(desc); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } catch (IllegalArgumentException e) { - throw new IllegalArgument(e.getMessage()); - } - } - - @Override - public void deleteTable(ByteBuffer in_tableName) throws IOError { - byte [] tableName = getBytes(in_tableName); - if (LOG.isDebugEnabled()) { - LOG.debug("deleteTable: table=" + Bytes.toString(tableName)); - } - try { - if (!admin.tableExists(tableName)) { - throw new IOError("table does not exist"); - } - admin.deleteTable(tableName); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public void mutateRow(ByteBuffer tableName, ByteBuffer row, - List mutations) throws IOError, IllegalArgument { - mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, - List mutations, long timestamp) throws IOError, IllegalArgument { - HTable table = null; - try { - table = getTable(tableName); - Put put = new Put(getBytes(row), timestamp, null); - - Delete delete = new Delete(getBytes(row)); - - // I apologize for all this mess :) - for (Mutation m : mutations) { - byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); - if (m.isDelete) { - if (famAndQf.length == 1) { - delete.deleteFamily(famAndQf[0], timestamp); - } else { - delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); - } - } else { - if(famAndQf.length == 1) { - put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } else { - put.add(famAndQf[0], famAndQf[1], - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } - } - } - if (!delete.isEmpty()) - table.delete(delete); - if (!put.isEmpty()) - table.put(put); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } catch (IllegalArgumentException e) { - throw new IllegalArgument(e.getMessage()); - } - } - - @Override - public void mutateRows(ByteBuffer tableName, List rowBatches) - throws IOError, IllegalArgument, TException { - mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP); - } - - @Override - public void mutateRowsTs(ByteBuffer tableName, List rowBatches, long timestamp) - throws IOError, IllegalArgument, TException { - List puts = new ArrayList(); - List deletes = new ArrayList(); - - for (BatchMutation batch : rowBatches) { - byte[] row = getBytes(batch.row); - List mutations = batch.mutations; - Delete delete = new Delete(row); - Put put = new Put(row, timestamp, null); - for (Mutation m : mutations) { - byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); - if (m.isDelete) { - // no qualifier, family only. - if (famAndQf.length == 1) { - delete.deleteFamily(famAndQf[0], timestamp); - } else { - delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); - } - } else { - if(famAndQf.length == 1) { - put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } else { - put.add(famAndQf[0], famAndQf[1], - m.value != null ? m.value.array() - : HConstants.EMPTY_BYTE_ARRAY); - } - } - } - if (!delete.isEmpty()) - deletes.add(delete); - if (!put.isEmpty()) - puts.add(put); - } - - HTable table = null; - try { - table = getTable(tableName); - if (!puts.isEmpty()) - table.put(puts); - for (Delete del : deletes) { - table.delete(del); - } - } catch (IOException e) { - throw new IOError(e.getMessage()); - } catch (IllegalArgumentException e) { - throw new IllegalArgument(e.getMessage()); - } - } - - @Deprecated - @Override - public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, - long amount) throws IOError, IllegalArgument, TException { - byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); - if(famAndQf.length == 1) { - return atomicIncrement(tableName, row, famAndQf[0], new byte[0], - amount); - } - return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); - } - - protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte [] family, - byte [] qualifier, long amount) - throws IOError, IllegalArgument, TException { - HTable table; - try { - table = getTable(tableName); - return table.incrementColumnValue(getBytes(row), family, qualifier, amount); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - public void scannerClose(int id) throws IOError, IllegalArgument { - LOG.debug("scannerClose: id=" + id); - ResultScanner scanner = getScanner(id); - if (scanner == null) { - throw new IllegalArgument("scanner ID is invalid"); - } - scanner.close(); - removeScanner(id); - } - - @Override - public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { - LOG.debug("scannerGetList: id=" + id); - ResultScanner scanner = getScanner(id); - if (null == scanner) { - throw new IllegalArgument("scanner ID is invalid"); - } - - Result [] results = null; - try { - results = scanner.next(nbRows); - if (null == results) { - return new ArrayList(); - } - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - return ThriftUtilities.rowResultFromHBase(results); - } - - @Override - public List scannerGet(int id) throws IllegalArgument, IOError { - return scannerGetList(id,1); - } - - public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(); - if (tScan.isSetStartRow()) { - scan.setStartRow(tScan.getStartRow()); - } - if (tScan.isSetStopRow()) { - scan.setStopRow(tScan.getStopRow()); - } - if (tScan.isSetTimestamp()) { - scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); - } - if (tScan.isSetCaching()) { - scan.setCaching(tScan.getCaching()); - } - if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { - for(ByteBuffer column : tScan.getColumns()) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - if (tScan.isSetFilterString()) { - ParseFilter parseFilter = new ParseFilter(); - scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, - List columns) throws IOError { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow)); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, - ByteBuffer stopRow, List columns) throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); - if(columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenWithPrefix(ByteBuffer tableName, - ByteBuffer startAndPrefix, - List columns) - throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startAndPrefix)); - Filter f = new WhileMatchFilter( - new PrefixFilter(getBytes(startAndPrefix))); - scan.setFilter(f); - if (columns != null && columns.size() != 0) { - for(ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, - List columns, long timestamp) throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow)); - scan.setTimeRange(Long.MIN_VALUE, timestamp); - if (columns != null && columns.size() != 0) { - for (ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, - ByteBuffer stopRow, List columns, long timestamp) - throws IOError, TException { - try { - HTable table = getTable(tableName); - Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); - scan.setTimeRange(Long.MIN_VALUE, timestamp); - if (columns != null && columns.size() != 0) { - for (ByteBuffer column : columns) { - byte [][] famQf = KeyValue.parseColumn(getBytes(column)); - if(famQf.length == 1) { - scan.addFamily(famQf[0]); - } else { - scan.addColumn(famQf[0], famQf[1]); - } - } - } - scan.setTimeRange(Long.MIN_VALUE, timestamp); - return addScanner(table.getScanner(scan)); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public Map getColumnDescriptors( - ByteBuffer tableName) throws IOError, TException { - try { - TreeMap columns = - new TreeMap(); - - HTable table = getTable(tableName); - HTableDescriptor desc = table.getTableDescriptor(); - - for (HColumnDescriptor e : desc.getFamilies()) { - ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); - columns.put(col.name, col); - } - return columns; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, - ByteBuffer family) throws IOError { - try { - HTable table = getTable(getBytes(tableName)); - Result result = table.getRowOrBefore(getBytes(row), getBytes(family)); - return ThriftUtilities.cellFromHBase(result.raw()); - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - - @Override - public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { - try { - HTable table = getTable(HConstants.META_TABLE_NAME); - Result startRowResult = table.getRowOrBefore( - searchRow.array(), HConstants.CATALOG_FAMILY); - - if (startRowResult == null) { - throw new IOException("Cannot find row in .META., row=" - + Bytes.toString(searchRow.array())); - } - - // find region start and end keys - byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY, - HConstants.REGIONINFO_QUALIFIER); - if (value == null || value.length == 0) { - throw new IOException("HRegionInfo REGIONINFO was null or " + - " empty in Meta for row=" - + Bytes.toString(searchRow.array())); - } - HRegionInfo regionInfo = Writables.getHRegionInfo(value); - TRegionInfo region = new TRegionInfo(); - region.setStartKey(regionInfo.getStartKey()); - region.setEndKey(regionInfo.getEndKey()); - region.id = regionInfo.getRegionId(); - region.setName(regionInfo.getRegionName()); - region.version = regionInfo.getVersion(); - - // find region assignment to server - value = startRowResult.getValue(HConstants.CATALOG_FAMILY, - HConstants.SERVER_QUALIFIER); - if (value != null && value.length > 0) { - String hostAndPort = Bytes.toString(value); - region.setServerName(Bytes.toBytes( - Addressing.parseHostname(hostAndPort))); - region.port = Addressing.parsePort(hostAndPort); - } - return region; - } catch (IOException e) { - throw new IOError(e.getMessage()); - } - } - } + // + // Main program and support routines + // public ThriftServer(Configuration conf) { this.conf = HBaseConfiguration.create(conf); } - // - // Main program and support routines - // - private static void printUsageAndExit(Options options, int exitCode) throws ExitCodeException { HelpFormatter formatter = new HelpFormatter(); @@ -1092,11 +80,20 @@ public class ThriftServer { throw new ExitCodeException(exitCode, ""); } - /* + /** * Start up or shuts down the Thrift server, depending on the arguments. * @param args */ void doMain(final String[] args) throws Exception { + processOptions(args); + serverRunner = new ThriftServerRunner(conf); + serverRunner.run(); + } + + /** + * Parse the command line options to set parameters the conf. + */ + private void processOptions(final String[] args) throws Exception { Options options = new Options(); options.addOption("b", BIND_OPTION, true, "Address to bind " + "the Thrift server to. Not supported by the Nonblocking and " + @@ -1143,10 +140,10 @@ public class ThriftServer { } // Get port to bind to - int listenPort = 0; try { - listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION, + int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION, String.valueOf(DEFAULT_LISTEN_PORT))); + conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort); } catch (NumberFormatException e) { LOG.error("Could not parse the value provided for the port option", e); printUsageAndExit(options, -1); @@ -1162,103 +159,14 @@ public class ThriftServer { optionToConf(cmd, KEEP_ALIVE_SEC_OPTION, conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY); - // Construct correct ProtocolFactory - TProtocolFactory protocolFactory; - if (cmd.hasOption(COMPACT_OPTION)) { - LOG.debug("Using compact protocol"); - protocolFactory = new TCompactProtocol.Factory(); - } else { - LOG.debug("Using binary protocol"); - protocolFactory = new TBinaryProtocol.Factory(); - } - - HBaseHandler handler = new HBaseHandler(conf); - Hbase.Processor processor = - new Hbase.Processor(handler); - ImplType implType = ImplType.getServerImpl(cmd); - - // Construct correct TransportFactory - TTransportFactory transportFactory; - if (cmd.hasOption(FRAMED_OPTION) || implType.isAlwaysFramed) { - transportFactory = new TFramedTransport.Factory(); - LOG.debug("Using framed transport"); - } else { - transportFactory = new TTransportFactory(); - } - - if (cmd.hasOption(BIND_OPTION) && !implType.canSpecifyBindIP) { - LOG.error("Server types " + Joiner.on(", ").join( - ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + - "address binding at the moment. See " + - "https://issues.apache.org/jira/browse/HBASE-2155 for details."); - printUsageAndExit(options, -1); - } - - if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING) { - if (cmd.hasOption(BIND_OPTION)) { - throw new RuntimeException("-" + BIND_OPTION + " not supported with " + - implType); - } - - TNonblockingServerTransport serverTransport = - new TNonblockingServerSocket(listenPort); - - if (implType == ImplType.NONBLOCKING) { - TNonblockingServer.Args serverArgs = - new TNonblockingServer.Args(serverTransport); - setServerArgs(serverArgs, processor, transportFactory, - protocolFactory); - server = new TNonblockingServer(serverArgs); - } else { - THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); - server = new THsHaServer(serverArgs); - } - LOG.info("starting HBase " + implType.simpleClassName() + - " server on " + Integer.toString(listenPort)); - } else if (implType == ImplType.THREAD_POOL) { - // Thread pool server. Get the IP address to bind to. - InetAddress listenAddress = getBindAddress(options, cmd); - - TServerTransport serverTransport = new TServerSocket( - new InetSocketAddress(listenAddress, listenPort)); - - TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args( - serverTransport, conf); - setServerArgs(serverArgs, processor, transportFactory, protocolFactory); - LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " - + listenAddress + ":" + Integer.toString(listenPort) - + "; " + serverArgs); - server = new TBoundedThreadPoolServer(serverArgs); - } else { - throw new AssertionError("Unsupported Thrift server implementation: " + - implType.simpleClassName()); - } - - // A sanity check that we instantiated the right type of server. - if (server.getClass() != implType.serverClass) { - throw new AssertionError("Expected to create Thrift server class " + - implType.serverClass.getName() + " but got " + - server.getClass().getName()); - } - - // login the server principal (if using secure Hadoop) - Configuration conf = handler.conf; - if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) { - String machineName = Strings.domainNamePointerToHostName( - DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"), - conf.get("hbase.thrift.dns.nameserver", "default"))); - User.login(conf, "hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", - machineName); + // Set general thrift server options + conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, cmd.hasOption(COMPACT_OPTION)); + conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, cmd.hasOption(FRAMED_OPTION)); + if (cmd.hasOption(BIND_OPTION)) { + conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION)); } - server.serve(); - } - - public void stop() { - server.stop(); + ImplType.setServerImpl(cmd, conf); } private InetAddress getBindAddress(Options options, CommandLine cmd) @@ -1275,18 +183,16 @@ public class ThriftServer { return listenAddress; } - private static void setServerArgs(AbstractServerArgs serverArgs, - Processor processor, TTransportFactory transportFactory, - TProtocolFactory protocolFactory) { - serverArgs.processor(processor); - serverArgs.transportFactory(transportFactory); - serverArgs.protocolFactory(protocolFactory); + public void stop() { + serverRunner.shutdown(); } private static void optionToConf(CommandLine cmd, String option, Configuration conf, String destConfKey) { if (cmd.hasOption(option)) { - conf.set(destConfKey, cmd.getOptionValue(option)); + String value = cmd.getOptionValue(option); + LOG.info("Set configuration key:" + destConfKey + " value:" + value); + conf.set(destConfKey, value); } } diff --git src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java new file mode 100644 index 0000000..52a76ba --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -0,0 +1,1226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import static org.apache.hadoop.hbase.util.Bytes.getBytes; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; +import org.apache.hadoop.hbase.thrift.generated.BatchMutation; +import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.Hbase.Iface; +import org.apache.hadoop.hbase.thrift.generated.Hbase.Processor; +import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; +import org.apache.hadoop.hbase.thrift.generated.IOError; +import org.apache.hadoop.hbase.thrift.generated.Mutation; +import org.apache.hadoop.hbase.thrift.generated.TCell; +import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; +import org.apache.hadoop.hbase.thrift.generated.TRowResult; +import org.apache.hadoop.hbase.thrift.generated.TScan; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.net.DNS; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TServer.AbstractServerArgs; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportFactory; + +import com.google.common.base.Joiner; + +/** + * ThriftServerRunner - this class starts up a Thrift server which implements the + * Hbase API specified in the Hbase.thrift IDL file. + */ +public class ThriftServerRunner implements Runnable { + + private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class); + + static final String SERVER_TYPE_CONF_KEY = "hadoop.regionserver.thrift.server.type"; + static final String BIND_CONF_KEY = "hadoop.regionserver.thrift.ipaddress"; + static final String COMPACT_CONF_KEY = "hadoop.regionserver.thrift.compact"; + static final String FRAMED_CONF_KEY = "hadoop.regionserver.thrift.framed"; + static final String PORT_CONF_KEY = "hadoop.regionserver.thrift.port"; + + private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; + private static final int DEFAULT_LISTEN_PORT = 9090; + + private Configuration conf; + volatile TServer tserver; + private final HBaseHandler handler; + + /** An enum of server implementation selections */ + enum ImplType { + HS_HA("hsha", true, THsHaServer.class, false), + NONBLOCKING("nonblocking", true, TNonblockingServer.class, false), + THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true), + THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, false); + + public static final ImplType DEFAULT = THREAD_POOL; + + final String option; + final boolean isAlwaysFramed; + final Class serverClass; + final boolean canSpecifyBindIP; + + ImplType(String option, boolean isAlwaysFramed, + Class serverClass, boolean canSpecifyBindIP) { + this.option = option; + this.isAlwaysFramed = isAlwaysFramed; + this.serverClass = serverClass; + this.canSpecifyBindIP = canSpecifyBindIP; + } + + /** + * @return -option so we can get the list of options from + * {@link #values()} + */ + @Override + public String toString() { + return "-" + option; + } + + String getDescription() { + StringBuilder sb = new StringBuilder("Use the " + + serverClass.getSimpleName()); + if (isAlwaysFramed) { + sb.append(" This implies the framed transport."); + } + if (this == DEFAULT) { + sb.append("This is the default."); + } + return sb.toString(); + } + + static OptionGroup createOptionGroup() { + OptionGroup group = new OptionGroup(); + for (ImplType t : values()) { + group.addOption(new Option(t.option, t.getDescription())); + } + return group; + } + + static ImplType getServerImpl(Configuration conf) { + ImplType chosenType = null; + String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option); + int numChosen = 0; + for (ImplType t : values()) { + if (confType.equals(t.option)) { + return t; + } + } + assert(false); // Will never happen + return null; + } + + static void setServerImpl(CommandLine cmd, Configuration conf) { + ImplType chosenType = null; + int numChosen = 0; + for (ImplType t : values()) { + if (cmd.hasOption(t.option)) { + chosenType = t; + ++numChosen; + } + } + if (numChosen != 1) { + throw new AssertionError("Exactly one option out of " + + Arrays.toString(values()) + " has to be specified"); + } + LOG.info("Setting thrift server to " + chosenType.option); + conf.set(SERVER_TYPE_CONF_KEY, chosenType.option); + } + + public String simpleClassName() { + return serverClass.getSimpleName(); + } + + public static List serversThatCannotSpecifyBindIP() { + List l = new ArrayList(); + for (ImplType t : values()) { + if (!t.canSpecifyBindIP) { + l.add(t.simpleClassName()); + } + } + return l; + } + + } + + public ThriftServerRunner(Configuration conf) throws IOException { + this(conf, new ThriftServerRunner.HBaseHandler(conf)); + } + + public ThriftServerRunner(Configuration conf, HBaseHandler handler) { + this.conf = HBaseConfiguration.create(conf); + this.handler = handler; + } + + /* + * Runs the Thrift server + */ + @Override + public void run() { + try { + setupServer(); + tserver.serve(); + } catch (Exception e) { + LOG.fatal("Cannot run ThriftServer"); + // Crash the process if the ThriftServer is not running + System.exit(-1); + } + } + + public void shutdown() { + if (tserver != null) { + tserver.stop(); + } + } + + /** + * Setting up the thrift TServer + */ + private void setupServer() throws Exception { + // Get port to bind to + int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); + + // Construct correct ProtocolFactory + TProtocolFactory protocolFactory; + if (conf.getBoolean(COMPACT_CONF_KEY, false)) { + LOG.debug("Using compact protocol"); + protocolFactory = new TCompactProtocol.Factory(); + } else { + LOG.debug("Using binary protocol"); + protocolFactory = new TBinaryProtocol.Factory(); + } + + Hbase.Processor processor = + new Hbase.Processor(handler); + ImplType implType = ImplType.getServerImpl(conf); + + // Construct correct TransportFactory + TTransportFactory transportFactory; + if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) { + transportFactory = new TFramedTransport.Factory(); + LOG.debug("Using framed transport"); + } else { + transportFactory = new TTransportFactory(); + } + + if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { + LOG.error("Server types " + Joiner.on(", ").join( + ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + + "address binding at the moment. See " + + "https://issues.apache.org/jira/browse/HBASE-2155 for details."); + throw new RuntimeException( + "-" + BIND_CONF_KEY + " not supported with " + implType); + } + + if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || + implType == ImplType.THREADED_SELECTOR) { + + TNonblockingServerTransport serverTransport = + new TNonblockingServerSocket(listenPort); + + if (implType == ImplType.NONBLOCKING) { + TNonblockingServer.Args serverArgs = + new TNonblockingServer.Args(serverTransport); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + tserver = new TNonblockingServer(serverArgs); + } else if (implType == ImplType.HS_HA) { + THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + tserver = new THsHaServer(serverArgs); + } else { // THREADED_SELECTOR + TThreadedSelectorServer.Args serverArgs = + new TThreadedSelectorServer.Args(serverTransport, conf); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + tserver = new TThreadedSelectorServer(serverArgs); + } + LOG.info("starting HBase " + implType.simpleClassName() + + " server on " + Integer.toString(listenPort)); + } else if (implType == ImplType.THREAD_POOL) { + // Thread pool server. Get the IP address to bind to. + InetAddress listenAddress = getBindAddress(conf); + + TServerTransport serverTransport = new TServerSocket( + new InetSocketAddress(listenAddress, listenPort)); + + TBoundedThreadPoolServer.Args serverArgs = new TBoundedThreadPoolServer.Args( + serverTransport, conf); + setServerArgs(serverArgs, processor, transportFactory, protocolFactory); + LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " + + listenAddress + ":" + Integer.toString(listenPort) + + "; " + serverArgs); + tserver = new TBoundedThreadPoolServer(serverArgs); + } else { + throw new AssertionError("Unsupported Thrift server implementation: " + + implType.simpleClassName()); + } + + // A sanity check that we instantiated the right type of server. + if (tserver.getClass() != implType.serverClass) { + throw new AssertionError("Expected to create Thrift server class " + + implType.serverClass.getName() + " but got " + + tserver.getClass().getName()); + } + + // login the server principal (if using secure Hadoop) + Configuration conf = handler.conf; + if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) { + String machineName = Strings.domainNamePointerToHostName( + DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"), + conf.get("hbase.thrift.dns.nameserver", "default"))); + User.login(conf, "hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", + machineName); + } + } + + private InetAddress getBindAddress(Configuration conf) throws IOException { + InetAddress listenAddress = null; + String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); + return InetAddress.getByName(bindAddressStr); + } + + private static void setServerArgs(AbstractServerArgs serverArgs, + Processor processor, TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + serverArgs.processor(processor); + serverArgs.transportFactory(transportFactory); + serverArgs.protocolFactory(protocolFactory); + } + + /** + * The HBaseHandler is a glue object that connects Thrift RPC calls to the + * HBase client API primarily defined in the HBaseAdmin and HTable objects. + */ + public static class HBaseHandler implements Hbase.Iface { + protected Configuration conf; + protected HBaseAdmin admin = null; + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + // nextScannerId and scannerMap are used to manage scanner state + protected int nextScannerId = 0; + protected HashMap scannerMap = null; + + private static ThreadLocal> threadLocalTables = new ThreadLocal>() { + @Override + protected Map initialValue() { + return new TreeMap(); + } + }; + + /** + * Returns a list of all the column families for a given htable. + * + * @param table + * @return + * @throws IOException + */ + byte[][] getAllColumns(HTable table) throws IOException { + HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); + byte[][] columns = new byte[cds.length][]; + for (int i = 0; i < cds.length; i++) { + columns[i] = Bytes.add(cds[i].getName(), + KeyValue.COLUMN_FAMILY_DELIM_ARRAY); + } + return columns; + } + + /** + * Creates and returns an HTable instance from a given table name. + * + * @param tableName + * name of table + * @return HTable object + * @throws IOException + * @throws IOError + */ + protected HTable getTable(final byte[] tableName) throws + IOException { + String table = new String(tableName); + Map tables = threadLocalTables.get(); + if (!tables.containsKey(table)) { + tables.put(table, new HTable(conf, tableName)); + } + return tables.get(table); + } + + protected HTable getTable(final ByteBuffer tableName) throws IOException { + return getTable(getBytes(tableName)); + } + + /** + * Assigns a unique ID to the scanner and adds the mapping to an internal + * hash-map. + * + * @param scanner + * @return integer scanner id + */ + protected synchronized int addScanner(ResultScanner scanner) { + int id = nextScannerId++; + scannerMap.put(id, scanner); + return id; + } + + /** + * Returns the scanner associated with the specified ID. + * + * @param id + * @return a Scanner, or null if ID was invalid. + */ + protected synchronized ResultScanner getScanner(int id) { + return scannerMap.get(id); + } + + /** + * Removes the scanner associated with the specified ID from the internal + * id->scanner hash-map. + * + * @param id + * @return a Scanner, or null if ID was invalid. + */ + protected synchronized ResultScanner removeScanner(int id) { + return scannerMap.remove(id); + } + + /** + * Constructs an HBaseHandler object. + * @throws IOException + */ + protected HBaseHandler() + throws IOException { + this(HBaseConfiguration.create()); + } + + protected HBaseHandler(final Configuration c) + throws IOException { + this.conf = c; + admin = new HBaseAdmin(conf); + scannerMap = new HashMap(); + } + + @Override + public void enableTable(ByteBuffer tableName) throws IOError { + try{ + admin.enableTable(getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void disableTable(ByteBuffer tableName) throws IOError{ + try{ + admin.disableTable(getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public boolean isTableEnabled(ByteBuffer tableName) throws IOError { + try { + return HTable.isTableEnabled(this.conf, getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void compact(ByteBuffer tableNameOrRegionName) throws IOError { + try{ + admin.compact(getBytes(tableNameOrRegionName)); + } catch (InterruptedException e) { + throw new IOError(e.getMessage()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { + try{ + admin.majorCompact(getBytes(tableNameOrRegionName)); + } catch (InterruptedException e) { + throw new IOError(e.getMessage()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getTableNames() throws IOError { + try { + HTableDescriptor[] tables = this.admin.listTables(); + ArrayList list = new ArrayList(tables.length); + for (int i = 0; i < tables.length; i++) { + list.add(ByteBuffer.wrap(tables[i].getName())); + } + return list; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getTableRegions(ByteBuffer tableName) + throws IOError { + try{ + List hris = this.admin.getTableRegions(tableName.array()); + List regions = new ArrayList(); + + if (hris != null) { + for (HRegionInfo regionInfo : hris){ + TRegionInfo region = new TRegionInfo(); + region.startKey = ByteBuffer.wrap(regionInfo.getStartKey()); + region.endKey = ByteBuffer.wrap(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.name = ByteBuffer.wrap(regionInfo.getRegionName()); + region.version = regionInfo.getVersion(); + regions.add(region); + } + } + return regions; + } catch (IOException e){ + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return get(tableName, row, famAndQf[0], new byte[0]); + } + return get(tableName, row, famAndQf[0], famAndQf[1]); + } + + protected List get(ByteBuffer tableName, + ByteBuffer row, + byte[] family, + byte[] qualifier) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + if (qualifier == null || qualifier.length == 0) { + get.addFamily(family); + } else { + get.addColumn(family, qualifier); + } + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List getVer(ByteBuffer tableName, ByteBuffer row, + ByteBuffer column, int numVersions) throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVer(tableName, row, famAndQf[0], + new byte[0], numVersions); + } + return getVer(tableName, row, + famAndQf[0], famAndQf[1], numVersions); + } + + public List getVer(ByteBuffer tableName, ByteBuffer row, + byte[] family, + byte[] qualifier, int numVersions) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + get.addColumn(family, qualifier); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List getVerTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp, + int numVersions) throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp, + numVersions); + } + return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, + numVersions); + } + + protected List getVerTs(ByteBuffer tableName, + ByteBuffer row, byte [] family, + byte [] qualifier, long timestamp, int numVersions) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + get.addColumn(family, qualifier); + get.setTimeRange(Long.MIN_VALUE, timestamp); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRow(ByteBuffer tableName, ByteBuffer row) + throws IOError { + return getRowWithColumnsTs(tableName, row, null, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowWithColumns(ByteBuffer tableName, + ByteBuffer row, + List columns) throws IOError { + return getRowWithColumnsTs(tableName, row, columns, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowTs(ByteBuffer tableName, ByteBuffer row, + long timestamp) throws IOError { + return getRowWithColumnsTs(tableName, row, null, + timestamp); + } + + @Override + public List getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row, + List columns, long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + if (columns == null) { + Get get = new Get(getBytes(row)); + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } + Get get = new Get(getBytes(row)); + for(ByteBuffer column : columns) { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRows(ByteBuffer tableName, + List rows) + throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowsWithColumns(ByteBuffer tableName, + List rows, + List columns) throws IOError { + return getRowsWithColumnsTs(tableName, rows, columns, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowsTs(ByteBuffer tableName, + List rows, + long timestamp) throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + timestamp); + } + + @Override + public List getRowsWithColumnsTs(ByteBuffer tableName, + List rows, + List columns, long timestamp) throws IOError { + try { + List gets = new ArrayList(rows.size()); + HTable table = getTable(tableName); + for (ByteBuffer row : rows) { + Get get = new Get(getBytes(row)); + if (columns != null) { + + for(ByteBuffer column : columns) { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + } + gets.add(get); + } + Result[] result = table.get(gets); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + throws IOError { + deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void deleteAllTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + Delete delete = new Delete(getBytes(row)); + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + table.delete(delete); + + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void deleteAllRow(ByteBuffer tableName, ByteBuffer row) throws IOError { + deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp) + throws IOError { + try { + HTable table = getTable(tableName); + Delete delete = new Delete(getBytes(row), timestamp, null); + table.delete(delete); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void createTable(ByteBuffer in_tableName, + List columnFamilies) throws IOError, + IllegalArgument, AlreadyExists { + byte [] tableName = getBytes(in_tableName); + try { + if (admin.tableExists(tableName)) { + throw new AlreadyExists("table name already in use"); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + for (ColumnDescriptor col : columnFamilies) { + HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); + desc.addFamily(colDesc); + } + admin.createTable(desc); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Override + public void deleteTable(ByteBuffer in_tableName) throws IOError { + byte [] tableName = getBytes(in_tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("deleteTable: table=" + Bytes.toString(tableName)); + } + try { + if (!admin.tableExists(tableName)) { + throw new IOError("table does not exist"); + } + admin.deleteTable(tableName); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void mutateRow(ByteBuffer tableName, ByteBuffer row, + List mutations) throws IOError, IllegalArgument { + mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, + List mutations, long timestamp) throws IOError, IllegalArgument { + HTable table = null; + try { + table = getTable(tableName); + Put put = new Put(getBytes(row), timestamp, null); + + Delete delete = new Delete(getBytes(row)); + + // I apologize for all this mess :) + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + if (m.isDelete) { + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + } else { + if(famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } else { + put.add(famAndQf[0], famAndQf[1], + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } + } + } + if (!delete.isEmpty()) + table.delete(delete); + if (!put.isEmpty()) + table.put(put); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Override + public void mutateRows(ByteBuffer tableName, List rowBatches) + throws IOError, IllegalArgument, TException { + mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void mutateRowsTs(ByteBuffer tableName, List rowBatches, long timestamp) + throws IOError, IllegalArgument, TException { + List puts = new ArrayList(); + List deletes = new ArrayList(); + + for (BatchMutation batch : rowBatches) { + byte[] row = getBytes(batch.row); + List mutations = batch.mutations; + Delete delete = new Delete(row); + Put put = new Put(row, timestamp, null); + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + if (m.isDelete) { + // no qualifier, family only. + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + } else { + if(famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } else { + put.add(famAndQf[0], famAndQf[1], + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } + } + } + if (!delete.isEmpty()) + deletes.add(delete); + if (!put.isEmpty()) + puts.add(put); + } + + HTable table = null; + try { + table = getTable(tableName); + if (!puts.isEmpty()) + table.put(puts); + for (Delete del : deletes) { + table.delete(del); + } + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Deprecated + @Override + public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, + long amount) throws IOError, IllegalArgument, TException { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return atomicIncrement(tableName, row, famAndQf[0], new byte[0], + amount); + } + return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); + } + + protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte [] family, + byte [] qualifier, long amount) + throws IOError, IllegalArgument, TException { + HTable table; + try { + table = getTable(tableName); + return table.incrementColumnValue(getBytes(row), family, qualifier, amount); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + public void scannerClose(int id) throws IOError, IllegalArgument { + LOG.debug("scannerClose: id=" + id); + ResultScanner scanner = getScanner(id); + if (scanner == null) { + throw new IllegalArgument("scanner ID is invalid"); + } + scanner.close(); + removeScanner(id); + } + + @Override + public List scannerGetList(int id,int nbRows) throws IllegalArgument, IOError { + LOG.debug("scannerGetList: id=" + id); + ResultScanner scanner = getScanner(id); + if (null == scanner) { + throw new IllegalArgument("scanner ID is invalid"); + } + + Result [] results = null; + try { + results = scanner.next(nbRows); + if (null == results) { + return new ArrayList(); + } + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + return ThriftUtilities.rowResultFromHBase(results); + } + + @Override + public List scannerGet(int id) throws IllegalArgument, IOError { + return scannerGetList(id,1); + } + + public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(); + if (tScan.isSetStartRow()) { + scan.setStartRow(tScan.getStartRow()); + } + if (tScan.isSetStopRow()) { + scan.setStopRow(tScan.getStopRow()); + } + if (tScan.isSetTimestamp()) { + scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); + } + if (tScan.isSetCaching()) { + scan.setCaching(tScan.getCaching()); + } + if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { + for(ByteBuffer column : tScan.getColumns()) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + if (tScan.isSetFilterString()) { + ParseFilter parseFilter = new ParseFilter(); + scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString())); + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, + List columns) throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List columns) throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithPrefix(ByteBuffer tableName, + ByteBuffer startAndPrefix, + List columns) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startAndPrefix)); + Filter f = new WhileMatchFilter( + new PrefixFilter(getBytes(startAndPrefix))); + scan.setFilter(f); + if (columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, + List columns, long timestamp) throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + scan.setTimeRange(Long.MIN_VALUE, timestamp); + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List columns, long timestamp) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + scan.setTimeRange(Long.MIN_VALUE, timestamp); + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + scan.setTimeRange(Long.MIN_VALUE, timestamp); + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public Map getColumnDescriptors( + ByteBuffer tableName) throws IOError, TException { + try { + TreeMap columns = + new TreeMap(); + + HTable table = getTable(tableName); + HTableDescriptor desc = table.getTableDescriptor(); + + for (HColumnDescriptor e : desc.getFamilies()) { + ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); + columns.put(col.name, col); + } + return columns; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, + ByteBuffer family) throws IOError { + try { + HTable table = getTable(getBytes(tableName)); + Result result = table.getRowOrBefore(getBytes(row), getBytes(family)); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { + try { + HTable table = getTable(HConstants.META_TABLE_NAME); + Result startRowResult = table.getRowOrBefore( + searchRow.array(), HConstants.CATALOG_FAMILY); + + if (startRowResult == null) { + throw new IOException("Cannot find row in .META., row=" + + Bytes.toString(searchRow.array())); + } + + // find region start and end keys + byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + if (value == null || value.length == 0) { + throw new IOException("HRegionInfo REGIONINFO was null or " + + " empty in Meta for row=" + + Bytes.toString(searchRow.array())); + } + HRegionInfo regionInfo = Writables.getHRegionInfo(value); + TRegionInfo region = new TRegionInfo(); + region.setStartKey(regionInfo.getStartKey()); + region.setEndKey(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.setName(regionInfo.getRegionName()); + region.version = regionInfo.getVersion(); + + // find region assignment to server + value = startRowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (value != null && value.length > 0) { + String hostAndPort = Bytes.toString(value); + region.setServerName(Bytes.toBytes( + Addressing.parseHostname(hostAndPort))); + region.port = Addressing.parsePort(hostAndPort); + } + return region; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + } +} diff --git src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java index 12247d0..5134ed1 100644 --- src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java +++ src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServer.java @@ -40,7 +40,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; /** - * Unit testing for ThriftServer.HBaseHandler, a part of the + * Unit testing for ThriftServerRunner.HBaseHandler, a part of the * org.apache.hadoop.hbase.thrift package. */ @Category(MediumTests.class) @@ -100,8 +100,8 @@ public class TestThriftServer { */ @Test public void doTestTableCreateDrop() throws Exception { - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); createTestTables(handler); dropTestTables(handler); } @@ -140,8 +140,8 @@ public class TestThriftServer { */ public void doTestTableMutations() throws Exception { // Setup - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); // Apply a few Mutations to rowA @@ -213,8 +213,8 @@ public class TestThriftServer { */ public void doTestTableTimestampsAndColumns() throws Exception { // Setup - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); // Apply timestamped Mutations to rowA @@ -292,8 +292,8 @@ public class TestThriftServer { */ public void doTestTableScanners() throws Exception { // Setup - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); // Apply timestamped Mutations to rowA @@ -360,8 +360,8 @@ public class TestThriftServer { * @throws Exception */ public void doTestGetTableRegions() throws Exception { - ThriftServer.HBaseHandler handler = - new ThriftServer.HBaseHandler(UTIL.getConfiguration()); + ThriftServerRunner.HBaseHandler handler = + new ThriftServerRunner.HBaseHandler(UTIL.getConfiguration()); handler.createTable(tableAname, getColumnDescriptors()); int regionCount = handler.getTableRegions(tableAname).size(); assertEquals("empty table should have only 1 region, " + @@ -456,7 +456,7 @@ public class TestThriftServer { * @param handler the HBaseHandler interfacing to HBase * @throws Exception */ - private void closeScanner(int scannerId, ThriftServer.HBaseHandler handler) throws Exception { + private void closeScanner(int scannerId, ThriftServerRunner.HBaseHandler handler) throws Exception { handler.scannerGet(scannerId); handler.scannerClose(scannerId); } diff --git src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java index 477141f..9607279 100644 --- src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java +++ src/test/java/org/apache/hadoop/hbase/thrift/TestThriftServerCmdLine.java @@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.LargeTests; import org.apache.hadoop.hbase.MediumTests; -import org.apache.hadoop.hbase.thrift.ThriftServer.ImplType; +import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType; import org.apache.hadoop.hbase.thrift.generated.Hbase; import org.apache.hadoop.hbase.util.Threads; import org.apache.thrift.protocol.TBinaryProtocol; @@ -82,7 +82,7 @@ public class TestThriftServerCmdLine { @Parameters public static Collection getParameters() { Collection parameters = new ArrayList(); - for (ThriftServer.ImplType implType : ThriftServer.ImplType.values()) { + for (ImplType implType : ImplType.values()) { for (boolean specifyFramed : new boolean[] {false, true}) { for (boolean specifyBindIP : new boolean[] {false, true}) { if (specifyBindIP && !implType.canSpecifyBindIP) { @@ -181,7 +181,8 @@ public class TestThriftServerCmdLine { } else { expectedClass = TBoundedThreadPoolServer.class; } - assertEquals(expectedClass, thriftServer.server.getClass()); + assertEquals(expectedClass, + thriftServer.serverRunner.tserver.getClass()); if (clientSideException != null) { LOG.error("Thrift client threw an exception", clientSideException);