diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutator.java
new file mode 100644
index 0000000..a870d43
--- /dev/null
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutator.java
@@ -0,0 +1,109 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Used to communicate with a single HBase table similar to {@Table} but meant for bulk writes.
+ * Obtain an instance from a {@link Connection} and call {@link #close()} afterwards.
+ *
+ *
Table can be used to get, put, delete or scan data from a table.
+ * @see ConnectionFactory
+ * @see Connection
+ * @since 1.0.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface BulkMutator extends Closeable {
+ /**
+ * Gets the fully qualified table name instance of the table that this BulkMutator writes to.
+ */
+ TableName getName();
+
+ /**
+ * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance.
+ *
+ * The reference returned is not a copy, so any change made to it will
+ * affect this instance.
+ */
+ Configuration getConfiguration();
+
+ /**
+ * Puts some data in the table. The puts will be buffered and sent over the wire as part of a
+ * batch.
+ *
+ * @param put The data to put.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ void put(Put put) throws IOException;
+
+
+ /**
+ * Puts some data in the table. The puts will be buffered and sent over the wire as part of a
+ * batch.
+ *
+ * @param put The data to put.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ void put(List puts) throws IOException;
+
+ /**
+ * Releases any resources held or pending changes in internal buffers.
+ *
+ * @throws IOException if a remote or network exception occurs.
+ */
+ @Override
+ void close() throws IOException;
+
+ /**
+ * Executes all the buffered {@link Put} operations.
+ *
+ * This method gets called once automatically for every {@link Put} or batch
+ * of {@link Put}s (when put(List) is used) when
+ * @throws IOException if a remote or network exception occurs.
+ */
+ void flushCommits() throws IOException;
+
+ /**
+ * Returns the maximum size in bytes of the write buffer for this HTable.
+ *
+ * The default value comes from the configuration parameter
+ * {@code hbase.client.write.buffer}.
+ * @return The size of the write buffer in bytes.
+ */
+ long getWriteBufferSize();
+
+ /**
+ * Sets the size of the buffer in bytes.
+ *
+ * If the new size is less than the current amount of data in the
+ * write buffer, the buffer gets flushed.
+ * @param writeBufferSize The new write buffer size, in bytes.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ void setWriteBufferSize(long writeBufferSize) throws IOException;
+}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutatorExceptionListener.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutatorExceptionListener.java
new file mode 100644
index 0000000..325ce7f
--- /dev/null
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutatorExceptionListener.java
@@ -0,0 +1,24 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+/**
+ * Listens for asynchronous exceptions on a {@link BulkMutator}.
+ */
+public interface BulkMutatorExceptionListener {
+ public void onException(RetriesExhaustedWithDetailsException exception, HBulkMutator hBulkMutator)
+ throws RetriesExhaustedWithDetailsException;
+}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBulkMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBulkMutator.java
new file mode 100644
index 0000000..0c50709
--- /dev/null
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBulkMutator.java
@@ -0,0 +1,279 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.util.DoNothingLock;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Used to communicate with a single HBase table similar to {@HTable} but meant for bulk
+ * writes. Obtain an instance from a {@link Connection} and call {@link #close()} afterwards.
+ *
+ *
Table can be used to get, put, delete or scan data from a table.
+ *
+ * @see ConnectionFactory
+ * @see Connection
+ * @since 1.0.0
+ */
+public class HBulkMutator implements BulkMutator {
+
+ private static final Log LOG = LogFactory.getLog(HBulkMutator.class);
+
+ private BulkMutatorExceptionListener listener;
+
+ protected final Connection connection;
+ private final TableName tableName;
+ private volatile Configuration configuration;
+ private TableConfiguration tableConfiguration;
+ private List writeAsyncBuffer = new LinkedList();
+ private long writeBufferSize;
+ protected long currentWriteBufferSize = 0;
+ private boolean closed = false;
+ private ExecutorService pool;
+ protected AsyncProcess ap;
+ private Lock lock;
+
+
+ public HBulkMutator(TableName tableName,
+ Connection connection,
+ ExecutorService pool,
+ RpcRetryingCallerFactory rpcCallerFactory,
+ RpcControllerFactory rpcControllerFactory,
+ TableConfiguration tableConfig,
+ BulkMutatorExceptionListener listener,
+ Lock lock) {
+ if (connection == null || connection.isClosed()) {
+ throw new IllegalArgumentException("Connection is null or closed.");
+ }
+ this.tableName = tableName;
+ this.connection = connection;
+ this.configuration = connection.getConfiguration();
+ this.pool = pool;
+ this.tableConfiguration = tableConfig;
+ this.listener = listener;
+ this.lock = lock;
+
+ this.writeBufferSize = tableConfiguration.getWriteBufferSize();
+
+ // puts need to track errors globally due to how the APIs currently work.
+ ap = new AsyncProcess((ClusterConnection) connection,
+ configuration,
+ pool,
+ rpcCallerFactory,
+ true,
+ rpcControllerFactory);
+ }
+
+ @VisibleForTesting
+ HBulkMutator(TableConfiguration tableConfiguration) {
+ this.tableConfiguration = tableConfiguration;
+ connection = null;
+ tableName = null;
+ lock = new DoNothingLock();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TableName getName() {
+ return tableName;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void put(Put put) throws InterruptedIOException,
+ RetriesExhaustedWithDetailsException {
+ lock.lock();
+ try {
+ doPut(put);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void put(List puts) throws InterruptedIOException,
+ RetriesExhaustedWithDetailsException {
+ lock.lock();
+ try {
+ for (Put put : puts) {
+ doPut(put);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Add the put to the buffer. If the buffer is already too large, sends the buffer to the cluster.
+ *
+ * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
+ * @throws InterruptedIOException if we were interrupted.
+ */
+ private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+ if (closed) {
+ throw new IllegalStateException("Cannot put when the BulkMutator is closed.");
+ }
+
+ // This behavior is highly non-intuitive... it does not protect us against
+ // 94-incompatible behavior, which is a timing issue because hasError, the below code
+ // and setter of hasError are not synchronized. Perhaps it should be removed.
+ if (ap.hasError()) {
+ writeAsyncBuffer.add(put);
+ backgroundFlushCommits(true);
+ }
+
+ validatePut(put);
+
+ currentWriteBufferSize += put.heapSize();
+ writeAsyncBuffer.add(put);
+
+ while (currentWriteBufferSize > writeBufferSize) {
+ backgroundFlushCommits(false);
+ }
+ }
+
+ // validate for well-formedness
+ public void validatePut(final Put put) throws IllegalArgumentException {
+ HTable.validatePut(put, tableConfiguration.getMaxKeyValueSize());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ if (this.closed) {
+ return;
+ }
+ flushCommits();
+ this.pool.shutdown();
+ try {
+ boolean terminated = false;
+ do {
+ // wait until the pool has terminated
+ terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
+ } while (!terminated);
+ } catch (InterruptedException e) {
+ LOG.warn("waitForTermination interrupted");
+ }
+ this.closed = true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+ lock.lock();
+ try {
+ // As we can have an operation in progress even if the buffer is empty, we call
+ // backgroundFlushCommits at least one time.
+ backgroundFlushCommits(true);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Send the operations in the buffer to the servers. Does not wait for the server's answer. If the
+ * is an error (max retried reach from a previous flush or bad operation), it tries to send all
+ * operations in the buffer and sends an exception.
+ *
+ * @param synchronous - if true, sends all the writes and wait for all of them to finish before
+ * returning.
+ */
+ private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException,
+ RetriesExhaustedWithDetailsException {
+ try {
+ if (!synchronous) {
+ ap.submit(tableName, writeAsyncBuffer, true, null, false);
+ if (ap.hasError()) {
+ LOG.debug(tableName + ": One or more of the operations have failed -"
+ + " waiting for all operation in progress to finish (successfully or not)");
+ }
+ }
+ if (synchronous || ap.hasError()) {
+ while (!writeAsyncBuffer.isEmpty()) {
+ ap.submit(tableName, writeAsyncBuffer, true, null, false);
+ }
+ RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
+ if (error != null) {
+ this.listener.onException(error, this);
+ }
+ }
+ } finally {
+ currentWriteBufferSize = 0;
+ for (Row mut : writeAsyncBuffer) {
+ if (mut instanceof Mutation) {
+ currentWriteBufferSize += ((Mutation) mut).heapSize();
+ }
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException,
+ InterruptedIOException {
+ this.writeBufferSize = writeBufferSize;
+ if (currentWriteBufferSize > writeBufferSize) {
+ flushCommits();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getWriteBufferSize() {
+ return this.writeBufferSize;
+ }
+
+ public List getWriteBuffer() {
+ return this.writeAsyncBuffer;
+ }
+}
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 68d3f9f..e9e44a3 100644
--- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1,39 +1,26 @@
/**
*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -70,34 +57,41 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DoNothingLock;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.Descriptors;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
- * An implementation of {@link Table}. Used to communicate with a single HBase table.
- * Lightweight. Get as needed and just close when done.
- * Instances of this class SHOULD NOT be constructed directly.
- * Obtain an instance via {@link Connection}. See {@link ConnectionFactory}
- * class comment for an example of how.
+ * An implementation of {@link Table}. Used to communicate with a single HBase table. Lightweight.
+ * Get as needed and just close when done. Instances of this class SHOULD NOT be constructed
+ * directly. Obtain an instance via {@link Connection}. See {@link ConnectionFactory} class comment
+ * for an example of how.
*
- * This class is NOT thread safe for reads nor writes.
- * In the case of writes (Put, Delete), the underlying write buffer can
- * be corrupted if multiple threads contend over a single HTable instance.
- * In the case of reads, some fields used by a Scan are shared among all threads.
+ *
This class is NOT thread safe for reads nor writes. In the case of writes (Put, Delete), the
+ * underlying write buffer can be corrupted if multiple threads contend over a single HTable
+ * instance. In the case of reads, some fields used by a Scan are shared among all threads.
*
*
HTable is no longer a client API. Use {@link Table} instead. It is marked
- * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in
- * Hadoop
- * Interface Classification
- * There are no guarantees for backwards source / binary compatibility and methods or class can
- * change or go away without deprecation.
+ * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in Hadoop
+ * Interface Classification There are no guarantees for backwards source / binary compatibility
+ * and methods or class can change or go away without deprecation.
*
* @see Table
* @see Admin
@@ -112,65 +106,62 @@ public class HTable implements HTableInterface {
private final TableName tableName;
private volatile Configuration configuration;
private TableConfiguration tableConfiguration;
- protected List writeAsyncBuffer = new LinkedList();
- private long writeBufferSize;
private boolean autoFlush = true;
- protected long currentWriteBufferSize = 0 ;
private boolean closed = false;
protected int scannerCaching;
- private ExecutorService pool; // For Multi & Scan
+ private ExecutorService pool; // For Multi & Scan
private int operationTimeout;
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
private HRegionLocator locator;
- /** The Async process for puts with autoflush set to false or multiputs */
- protected AsyncProcess ap;
/** The Async process for batch */
protected AsyncProcess multiAp;
private RpcRetryingCallerFactory rpcCallerFactory;
private RpcControllerFactory rpcControllerFactory;
+ protected HBulkMutator mutator;
+
/**
* Creates an object to access a HBase table.
+ *
* @param conf Configuration object to use.
* @param tableName Name of the table.
* @throws IOException if a remote or network exception occurs
* @deprecated Constructing HTable objects manually has been deprecated. Please use
- * {@link Connection} to instantiate a {@link Table} instead.
+ * {@link Connection} to instantiate a {@link Table} instead.
*/
@Deprecated
- public HTable(Configuration conf, final String tableName)
- throws IOException {
+ public HTable(Configuration conf, final String tableName) throws IOException {
this(conf, TableName.valueOf(tableName));
}
/**
* Creates an object to access a HBase table.
+ *
* @param conf Configuration object to use.
* @param tableName Name of the table.
* @throws IOException if a remote or network exception occurs
* @deprecated Constructing HTable objects manually has been deprecated. Please use
- * {@link Connection} to instantiate a {@link Table} instead.
+ * {@link Connection} to instantiate a {@link Table} instead.
*/
@Deprecated
- public HTable(Configuration conf, final byte[] tableName)
- throws IOException {
+ public HTable(Configuration conf, final byte[] tableName) throws IOException {
this(conf, TableName.valueOf(tableName));
}
/**
* Creates an object to access a HBase table.
+ *
* @param conf Configuration object to use.
* @param tableName table name pojo
* @throws IOException if a remote or network exception occurs
* @deprecated Constructing HTable objects manually has been deprecated. Please use
- * {@link Connection} to instantiate a {@link Table} instead.
+ * {@link Connection} to instantiate a {@link Table} instead.
*/
@Deprecated
- public HTable(Configuration conf, final TableName tableName)
- throws IOException {
+ public HTable(Configuration conf, final TableName tableName) throws IOException {
this.tableName = tableName;
this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
if (conf == null) {
@@ -186,6 +177,7 @@ public class HTable implements HTableInterface {
/**
* Creates an object to access a HBase table.
+ *
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @throws IOException if a remote or network exception occurs
@@ -196,7 +188,7 @@ public class HTable implements HTableInterface {
this.tableName = tableName;
this.cleanupPoolOnClose = true;
this.cleanupConnectionOnClose = false;
- this.connection = (ClusterConnection)connection;
+ this.connection = (ClusterConnection) connection;
this.configuration = connection.getConfiguration();
this.pool = getDefaultExecutor(this.configuration);
@@ -216,20 +208,25 @@ public class HTable implements HTableInterface {
// if it is necessary and will grow unbounded. This could be bad but in HCM
// we only create as many Runnables as there are region servers. It means
// it also scales when new region servers are added.
- ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
- new SynchronousQueue(), Threads.newDaemonThreadFactory("htable"));
- ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(1,
+ maxThreads,
+ keepAliveTime,
+ TimeUnit.SECONDS,
+ new SynchronousQueue(),
+ Threads.newDaemonThreadFactory("htable"));
+ pool.allowCoreThreadTimeOut(true);
return pool;
}
/**
* Creates an object to access a HBase table.
+ *
* @param conf Configuration object to use.
* @param tableName Name of the table.
* @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
* @deprecated Constructing HTable objects manually has been deprecated. Please use
- * {@link Connection} to instantiate a {@link Table} instead.
+ * {@link Connection} to instantiate a {@link Table} instead.
*/
@Deprecated
public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
@@ -239,12 +236,13 @@ public class HTable implements HTableInterface {
/**
* Creates an object to access a HBase table.
+ *
* @param conf Configuration object to use.
* @param tableName Name of the table.
* @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
* @deprecated Constructing HTable objects manually has been deprecated. Please use
- * {@link Connection} to instantiate a {@link Table} instead.
+ * {@link Connection} to instantiate a {@link Table} instead.
*/
@Deprecated
public HTable(Configuration conf, final TableName tableName, final ExecutorService pool)
@@ -265,6 +263,7 @@ public class HTable implements HTableInterface {
/**
* Creates an object to access a HBase table.
+ *
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @param pool ExecutorService to be used.
@@ -272,29 +271,31 @@ public class HTable implements HTableInterface {
* @deprecated Do not use, internal ctor.
*/
@Deprecated
- public HTable(final byte[] tableName, final Connection connection,
- final ExecutorService pool) throws IOException {
+ public HTable(final byte[] tableName, final Connection connection, final ExecutorService pool)
+ throws IOException {
this(TableName.valueOf(tableName), connection, pool);
}
/** @deprecated Do not use, internal ctor. */
@Deprecated
- public HTable(TableName tableName, final Connection connection,
- final ExecutorService pool) throws IOException {
- this(tableName, (ClusterConnection)connection, null, null, null, pool);
+ public HTable(TableName tableName, final Connection connection, final ExecutorService pool)
+ throws IOException {
+ this(tableName, (ClusterConnection) connection, null, null, null, pool);
}
/**
- * Creates an object to access a HBase table.
- * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to
- * get a {@link Table} instance (use {@link Table} instead of {@link HTable}).
+ * Creates an object to access a HBase table. Used by HBase internally. DO NOT USE. See
+ * {@link ConnectionFactory} class comment for how to get a {@link Table} instance (use
+ * {@link Table} instead of {@link HTable}).
+ *
* @param tableName Name of the table.
* @param connection HConnection to be used.
* @param pool ExecutorService to be used.
* @throws IOException if a remote or network exception occurs
*/
@InterfaceAudience.Private
- public HTable(TableName tableName, final ClusterConnection connection,
+ public HTable(TableName tableName,
+ final ClusterConnection connection,
final TableConfiguration tableConfig,
final RpcRetryingCallerFactory rpcCallerFactory,
final RpcControllerFactory rpcControllerFactory,
@@ -323,6 +324,7 @@ public class HTable implements HTableInterface {
/**
* For internal testing.
+ *
* @throws IOException
*/
@VisibleForTesting
@@ -331,6 +333,7 @@ public class HTable implements HTableInterface {
tableConfiguration = new TableConfiguration();
cleanupPoolOnClose = false;
cleanupConnectionOnClose = false;
+ this.mutator = new HBulkMutator(tableConfiguration);
}
/**
@@ -343,14 +346,13 @@ public class HTable implements HTableInterface {
/**
* setup this HTable's parameter based on the passed configuration
*/
- private void finishSetup() throws IOException {
+ private void finishSetup() {
if (tableConfiguration == null) {
tableConfiguration = new TableConfiguration(configuration);
}
this.operationTimeout = tableName.isSystemTable() ?
tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout();
- this.writeBufferSize = tableConfiguration.getWriteBufferSize();
this.scannerCaching = tableConfiguration.getScannerCaching();
if (this.rpcCallerFactory == null) {
@@ -361,9 +363,22 @@ public class HTable implements HTableInterface {
}
// puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);
- multiAp = this.connection.getAsyncProcess();
this.locator = new HRegionLocator(getName(), connection);
+ BulkMutatorExceptionListener listener = new BulkMutatorExceptionListener() {
+ @Override
+ public void onException(RetriesExhaustedWithDetailsException exception,
+ HBulkMutator hBulkMutator) throws RetriesExhaustedWithDetailsException {
+ throw exception;
+ }
+ };
+ this.mutator = new HBulkMutator(tableName,
+ connection,
+ pool,
+ rpcCallerFactory,
+ rpcControllerFactory,
+ tableConfiguration,
+ listener,
+ new DoNothingLock());
}
/**
@@ -375,9 +390,9 @@ public class HTable implements HTableInterface {
}
/**
- * Tells whether or not a table is enabled or not. This method creates a
- * new HBase configuration, so it might make your unit tests fail due to
- * incorrect ZK client port.
+ * Tells whether or not a table is enabled or not. This method creates a new HBase configuration,
+ * so it might make your unit tests fail due to incorrect ZK client port.
+ *
* @param tableName Name of table to check.
* @return {@code true} if table is online.
* @throws IOException if a remote or network exception occurs
@@ -389,9 +404,9 @@ public class HTable implements HTableInterface {
}
/**
- * Tells whether or not a table is enabled or not. This method creates a
- * new HBase configuration, so it might make your unit tests fail due to
- * incorrect ZK client port.
+ * Tells whether or not a table is enabled or not. This method creates a new HBase configuration,
+ * so it might make your unit tests fail due to incorrect ZK client port.
+ *
* @param tableName Name of table to check.
* @return {@code true} if table is online.
* @throws IOException if a remote or network exception occurs
@@ -403,9 +418,9 @@ public class HTable implements HTableInterface {
}
/**
- * Tells whether or not a table is enabled or not. This method creates a
- * new HBase configuration, so it might make your unit tests fail due to
- * incorrect ZK client port.
+ * Tells whether or not a table is enabled or not. This method creates a new HBase configuration,
+ * so it might make your unit tests fail due to incorrect ZK client port.
+ *
* @param tableName Name of table to check.
* @return {@code true} if table is online.
* @throws IOException if a remote or network exception occurs
@@ -418,6 +433,7 @@ public class HTable implements HTableInterface {
/**
* Tells whether or not a table is enabled or not.
+ *
* @param conf The Configuration object to use.
* @param tableName Name of table to check.
* @return {@code true} if table is online.
@@ -425,13 +441,13 @@ public class HTable implements HTableInterface {
* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
*/
@Deprecated
- public static boolean isTableEnabled(Configuration conf, String tableName)
- throws IOException {
+ public static boolean isTableEnabled(Configuration conf, String tableName) throws IOException {
return isTableEnabled(conf, TableName.valueOf(tableName));
}
/**
* Tells whether or not a table is enabled or not.
+ *
* @param conf The Configuration object to use.
* @param tableName Name of table to check.
* @return {@code true} if table is online.
@@ -439,13 +455,13 @@ public class HTable implements HTableInterface {
* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
*/
@Deprecated
- public static boolean isTableEnabled(Configuration conf, byte[] tableName)
- throws IOException {
+ public static boolean isTableEnabled(Configuration conf, byte[] tableName) throws IOException {
return isTableEnabled(conf, TableName.valueOf(tableName));
}
/**
* Tells whether or not a table is enabled or not.
+ *
* @param conf The Configuration object to use.
* @param tableName Name of table to check.
* @return {@code true} if table is online.
@@ -453,8 +469,8 @@ public class HTable implements HTableInterface {
* @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)}
*/
@Deprecated
- public static boolean isTableEnabled(Configuration conf,
- final TableName tableName) throws IOException {
+ public static boolean isTableEnabled(Configuration conf, final TableName tableName)
+ throws IOException {
return HConnectionManager.execute(new HConnectable(conf) {
@Override
public Boolean connect(HConnection connection) throws IOException {
@@ -465,14 +481,14 @@ public class HTable implements HTableInterface {
/**
* Find region location hosting passed row using cached info
+ *
* @param row Row to find.
* @return The location of the given row.
* @throws IOException if a remote or network exception occurs
* @deprecated Use {@link RegionLocator#getRegionLocation(byte[])}
*/
@Deprecated
- public HRegionLocation getRegionLocation(final String row)
- throws IOException {
+ public HRegionLocation getRegionLocation(final String row) throws IOException {
return getRegionLocation(Bytes.toBytes(row), false);
}
@@ -480,8 +496,7 @@ public class HTable implements HTableInterface {
* @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead.
*/
@Deprecated
- public HRegionLocation getRegionLocation(final byte [] row)
- throws IOException {
+ public HRegionLocation getRegionLocation(final byte[] row) throws IOException {
return locator.getRegionLocation(row);
}
@@ -489,16 +504,18 @@ public class HTable implements HTableInterface {
* @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead.
*/
@Deprecated
- public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
- throws IOException {
+ public HRegionLocation getRegionLocation(final byte[] row, boolean reload) throws IOException {
return locator.getRegionLocation(row, reload);
}
/**
* {@inheritDoc}
+ *
+ * @deprecated
*/
+ @Deprecated
@Override
- public byte [] getTableName() {
+ public byte[] getTableName() {
return this.tableName.getName();
}
@@ -508,12 +525,12 @@ public class HTable implements HTableInterface {
}
/**
- * INTERNAL Used by unit tests and tools to do low-level
- * manipulations.
+ * INTERNAL Used by unit tests and tools to do low-level manipulations.
+ *
* @return An HConnection instance.
* @deprecated This method will be changed from public to package protected.
*/
- // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers.
+ // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers.
@Deprecated
@VisibleForTesting
public HConnection getConnection() {
@@ -524,6 +541,7 @@ public class HTable implements HTableInterface {
* Gets the number of rows that a scanner will fetch at once.
*
* The default value comes from {@code hbase.client.scanner.caching}.
+ *
* @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
*/
@Deprecated
@@ -533,21 +551,22 @@ public class HTable implements HTableInterface {
/**
* Kept in 0.96 for backward compatibility
- * @deprecated since 0.96. This is an internal buffer that should not be read nor write.
+ *
+ * @deprecated since 0.96. This is an internal buffer that should not be read nor write.
*/
@Deprecated
public List getWriteBuffer() {
- return writeAsyncBuffer;
+ return mutator.getWriteBuffer();
}
/**
* Sets the number of rows that a scanner will fetch at once.
*
- * This will override the value specified by
- * {@code hbase.client.scanner.caching}.
- * Increasing this value will reduce the amount of work needed each time
- * {@code next()} is called on a scanner, at the expense of memory use
- * (since more rows will need to be maintained in memory by the scanners).
+ * This will override the value specified by {@code hbase.client.scanner.caching}. Increasing this
+ * value will reduce the amount of work needed each time {@code next()} is called on a scanner, at
+ * the expense of memory use (since more rows will need to be maintained in memory by the
+ * scanners).
+ *
* @param scannerCaching the number of rows a scanner will fetch at once.
* @deprecated Use {@link Scan#setCaching(int)}
*/
@@ -562,25 +581,27 @@ public class HTable implements HTableInterface {
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
// TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one.
- if (tableName == null) return null;
+ if (tableName == null) {
+ return null;
+ }
if (tableName.equals(TableName.META_TABLE_NAME)) {
return HTableDescriptor.META_TABLEDESC;
}
- HTableDescriptor htd = executeMasterCallable(
- new MasterCallable(getConnection()) {
- @Override
- public HTableDescriptor call(int callTimeout) throws ServiceException {
- GetTableDescriptorsResponse htds;
- GetTableDescriptorsRequest req =
- RequestConverter.buildGetTableDescriptorsRequest(tableName);
- htds = master.getTableDescriptors(null, req);
-
- if (!htds.getTableSchemaList().isEmpty()) {
- return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
- }
- return null;
- }
- });
+ HTableDescriptor htd =
+ executeMasterCallable(new MasterCallable(getConnection()) {
+ @Override
+ public HTableDescriptor call(int callTimeout) throws ServiceException {
+ GetTableDescriptorsResponse htds;
+ GetTableDescriptorsRequest req =
+ RequestConverter.buildGetTableDescriptorsRequest(tableName);
+ htds = master.getTableDescriptors(null, req);
+
+ if (!htds.getTableSchemaList().isEmpty()) {
+ return HTableDescriptor.convert(htds.getTableSchemaList().get(0));
+ }
+ return null;
+ }
+ });
if (htd != null) {
return new UnmodifyableHTableDescriptor(htd);
}
@@ -600,7 +621,7 @@ public class HTable implements HTableInterface {
* @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
*/
@Deprecated
- public byte [][] getStartKeys() throws IOException {
+ public byte[][] getStartKeys() throws IOException {
return locator.getStartKeys();
}
@@ -616,7 +637,7 @@ public class HTable implements HTableInterface {
* @deprecated Use {@link RegionLocator#getStartEndKeys()} instead;
*/
@Deprecated
- public Pair getStartEndKeys() throws IOException {
+ public Pair getStartEndKeys() throws IOException {
return locator.getStartEndKeys();
}
@@ -624,13 +645,15 @@ public class HTable implements HTableInterface {
* Gets all the regions and their address for this table.
*
* This is mainly useful for the MapReduce integration.
+ *
* @return A map of HRegionInfo with it's server address
* @throws IOException if a remote or network exception occurs
- * @deprecated This is no longer a public API. Use {@link #getAllRegionLocations()} instead.
+ * @deprecated This is no longer a public API. Use {@link #getAllRegionLocations()} instead.
*/
@Deprecated
public NavigableMap getRegionLocations() throws IOException {
- // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation.
+ // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an
+ // HRegionLocation.
return MetaScanner.allTableRegions(this.connection, getName());
}
@@ -638,9 +661,10 @@ public class HTable implements HTableInterface {
* Gets all the regions and their address for this table.
*
* This is mainly useful for the MapReduce integration.
+ *
* @return A map of HRegionInfo with it's server address
* @throws IOException if a remote or network exception occurs
- *
+ *
* @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead;
*/
@Deprecated
@@ -651,77 +675,76 @@ public class HTable implements HTableInterface {
/**
* Get the corresponding regions for an arbitrary range of keys.
*
+ *
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range, exclusive
- * @return A list of HRegionLocations corresponding to the regions that
- * contain the specified range
+ * @return A list of HRegionLocations corresponding to the regions that contain the specified
+ * range
* @throws IOException if a remote or network exception occurs
* @deprecated This is no longer a public API
*/
@Deprecated
- public List getRegionsInRange(final byte [] startKey,
- final byte [] endKey) throws IOException {
+ public List getRegionsInRange(final byte[] startKey, final byte[] endKey)
+ throws IOException {
return getRegionsInRange(startKey, endKey, false);
}
/**
* Get the corresponding regions for an arbitrary range of keys.
*
+ *
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range, exclusive
* @param reload true to reload information or false to use cached information
- * @return A list of HRegionLocations corresponding to the regions that
- * contain the specified range
+ * @return A list of HRegionLocations corresponding to the regions that contain the specified
+ * range
* @throws IOException if a remote or network exception occurs
* @deprecated This is no longer a public API
*/
@Deprecated
- public List getRegionsInRange(final byte [] startKey,
- final byte [] endKey, final boolean reload) throws IOException {
+ public List getRegionsInRange(final byte[] startKey, final byte[] endKey,
+ final boolean reload) throws IOException {
return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond();
}
/**
- * Get the corresponding start keys and regions for an arbitrary range of
- * keys.
+ * Get the corresponding start keys and regions for an arbitrary range of keys.
*
+ *
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range
* @param includeEndKey true if endRow is inclusive, false if exclusive
- * @return A pair of list of start keys and list of HRegionLocations that
- * contain the specified range
+ * @return A pair of list of start keys and list of HRegionLocations that contain the specified
+ * range
* @throws IOException if a remote or network exception occurs
* @deprecated This is no longer a public API
*/
@Deprecated
- private Pair, List> getKeysAndRegionsInRange(
- final byte[] startKey, final byte[] endKey, final boolean includeEndKey)
- throws IOException {
+ private Pair, List> getKeysAndRegionsInRange(final byte[] startKey,
+ final byte[] endKey, final boolean includeEndKey) throws IOException {
return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false);
}
/**
- * Get the corresponding start keys and regions for an arbitrary range of
- * keys.
+ * Get the corresponding start keys and regions for an arbitrary range of keys.
*
+ *
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range
* @param includeEndKey true if endRow is inclusive, false if exclusive
* @param reload true to reload information or false to use cached information
- * @return A pair of list of start keys and list of HRegionLocations that
- * contain the specified range
+ * @return A pair of list of start keys and list of HRegionLocations that contain the specified
+ * range
* @throws IOException if a remote or network exception occurs
* @deprecated This is no longer a public API
*/
@Deprecated
- private Pair, List> getKeysAndRegionsInRange(
- final byte[] startKey, final byte[] endKey, final boolean includeEndKey,
- final boolean reload) throws IOException {
- final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW);
+ private Pair, List> getKeysAndRegionsInRange(final byte[] startKey,
+ final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException {
+ final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
- throw new IllegalArgumentException(
- "Invalid range: " + Bytes.toStringBinary(startKey) +
- " > " + Bytes.toStringBinary(endKey));
+ throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(startKey) + " > "
+ + Bytes.toStringBinary(endKey));
}
List keysInRange = new ArrayList();
List regionsInRange = new ArrayList();
@@ -731,45 +754,46 @@ public class HTable implements HTableInterface {
keysInRange.add(currentKey);
regionsInRange.add(regionLocation);
currentKey = regionLocation.getRegionInfo().getEndKey();
- } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
- && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0
- || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
- return new Pair, List>(keysInRange,
- regionsInRange);
+ } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) && (endKeyIsEndOfTable
+ || Bytes.compareTo(currentKey, endKey) < 0
+ || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0)));
+ return new Pair, List>(keysInRange, regionsInRange);
}
/**
* {@inheritDoc}
+ *
* @deprecated Use reversed scan instead.
*/
- @Override
- @Deprecated
- public Result getRowOrBefore(final byte[] row, final byte[] family)
- throws IOException {
- RegionServerCallable callable = new RegionServerCallable(this.connection,
- tableName, row) {
- @Override
- public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
- getLocation().getRegionInfo().getRegionName(), row, family);
- try {
- ClientProtos.GetResponse response = getStub().get(controller, request);
- if (!response.hasResult()) return null;
- return ProtobufUtil.toResult(response.getResult());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
- }
-
- /**
- * The underlying {@link HTable} must not be closed.
- * {@link HTableInterface#getScanner(Scan)} has other usage details.
+ @Override
+ @Deprecated
+ public Result getRowOrBefore(final byte[] row, final byte[] family) throws IOException {
+ RegionServerCallable callable =
+ new RegionServerCallable(this.connection, tableName, row) {
+ @Override
+ public Result call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest(
+ getLocation().getRegionInfo().getRegionName(), row, family);
+ try {
+ ClientProtos.GetResponse response = getStub().get(controller, request);
+ if (!response.hasResult()) {
+ return null;
+ }
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
+ }
+
+ /**
+ * The underlying {@link HTable} must not be closed. {@link Table#getScanner(Scan)} has other
+ * usage details.
*/
@Override
public ResultScanner getScanner(final Scan scan) throws IOException {
@@ -779,45 +803,64 @@ public class HTable implements HTableInterface {
if (scan.isReversed()) {
if (scan.isSmall()) {
- return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
- this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
- pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
+ return new ClientSmallReversedScanner(getConfiguration(),
+ scan,
+ getName(),
+ this.connection,
+ this.rpcCallerFactory,
+ this.rpcControllerFactory,
+ pool,
+ tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
- return new ReversedClientScanner(getConfiguration(), scan, getName(),
- this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
- pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
+ return new ReversedClientScanner(getConfiguration(),
+ scan,
+ getName(),
+ this.connection,
+ this.rpcCallerFactory,
+ this.rpcControllerFactory,
+ pool,
+ tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
if (scan.isSmall()) {
- return new ClientSmallScanner(getConfiguration(), scan, getName(),
- this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
- pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
+ return new ClientSmallScanner(getConfiguration(),
+ scan,
+ getName(),
+ this.connection,
+ this.rpcCallerFactory,
+ this.rpcControllerFactory,
+ pool,
+ tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
- return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
- this.rpcCallerFactory, this.rpcControllerFactory,
- pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
+ return new ClientScanner(getConfiguration(),
+ scan,
+ getName(),
+ this.connection,
+ this.rpcCallerFactory,
+ this.rpcControllerFactory,
+ pool,
+ tableConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
/**
- * The underlying {@link HTable} must not be closed.
- * {@link HTableInterface#getScanner(byte[])} has other usage details.
+ * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[])} has other
+ * usage details.
*/
@Override
- public ResultScanner getScanner(byte [] family) throws IOException {
+ public ResultScanner getScanner(byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return getScanner(scan);
}
/**
- * The underlying {@link HTable} must not be closed.
- * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details.
+ * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[], byte[])} has
+ * other usage details.
*/
@Override
- public ResultScanner getScanner(byte [] family, byte [] qualifier)
- throws IOException {
+ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return getScanner(scan);
@@ -828,39 +871,45 @@ public class HTable implements HTableInterface {
*/
@Override
public Result get(final Get get) throws IOException {
- if (get.getConsistency() == null){
+ if (get.getConsistency() == null) {
get.setConsistency(defaultConsistency);
}
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
- RegionServerCallable callable = new RegionServerCallable(this.connection,
- getName(), get.getRow()) {
- @Override
- public Result call(int callTimeout) throws IOException {
- ClientProtos.GetRequest request =
- RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get);
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- ClientProtos.GetResponse response = getStub().get(controller, request);
- if (response == null) return null;
- return ProtobufUtil.toResult(response.getResult());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
+ RegionServerCallable callable =
+ new RegionServerCallable(this.connection, getName(), get.getRow()) {
+ @Override
+ public Result call(int callTimeout) throws IOException {
+ ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
+ getLocation().getRegionInfo().getRegionName(), get);
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ ClientProtos.GetResponse response = getStub().get(controller, request);
+ if (response == null) {
+ return null;
+ }
+ return ProtobufUtil.toResult(response.getResult());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ };
return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
// Call that takes into account the replica
- RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
- rpcControllerFactory, tableName, this.connection, get, pool,
- tableConfiguration.getRetriesNumber(),
- operationTimeout,
- tableConfiguration.getPrimaryCallTimeoutMicroSecond());
+ RpcRetryingCallerWithReadReplicas callable =
+ new RpcRetryingCallerWithReadReplicas(rpcControllerFactory,
+ tableName,
+ this.connection,
+ get,
+ pool,
+ tableConfiguration.getRetriesNumber(),
+ operationTimeout,
+ tableConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}
@@ -871,14 +920,14 @@ public class HTable implements HTableInterface {
@Override
public Result[] get(List gets) throws IOException {
if (gets.size() == 1) {
- return new Result[]{get(gets.get(0))};
+ return new Result[] {get(gets.get(0))};
}
try {
- Object [] r1 = batch((List)gets);
+ Object[] r1 = batch((List) gets);
// translate.
- Result [] results = new Result[r1.length];
- int i=0;
+ Result[] results = new Result[r1.length];
+ int i = 0;
for (Object o : r1) {
// batch ensures if there is a failure we get an exception instead
results[i++] = (Result) o;
@@ -886,7 +935,7 @@ public class HTable implements HTableInterface {
return results;
} catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}
@@ -905,13 +954,14 @@ public class HTable implements HTableInterface {
/**
* {@inheritDoc}
- * @deprecated If any exception is thrown by one of the actions, there is no way to
- * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
+ *
+ * @deprecated If any exception is thrown by one of the actions, there is no way to retrieve the
+ * partially executed results. Use {@link #batch(List, Object[])} instead.
*/
@Deprecated
@Override
- public Object[] batch(final List extends Row> actions)
- throws InterruptedException, IOException {
+ public Object[] batch(final List extends Row> actions) throws InterruptedException,
+ IOException {
Object[] results = new Object[actions.size()];
batch(actions, results);
return results;
@@ -921,24 +971,22 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void batchCallback(
- final List extends Row> actions, final Object[] results, final Batch.Callback callback)
- throws IOException, InterruptedException {
+ public void batchCallback(final List extends Row> actions, final Object[] results,
+ final Batch.Callback callback) throws IOException, InterruptedException {
connection.processBatchCallback(actions, tableName, pool, results, callback);
}
/**
* {@inheritDoc}
- * @deprecated If any exception is thrown by one of the actions, there is no way to
- * retrieve the partially executed results. Use
- * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
- * instead.
+ *
+ * @deprecated If any exception is thrown by one of the actions, there is no way to retrieve the
+ * partially executed results. Use {@link #batchCallback(List, Object[],
+ * org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} instead.
*/
@Deprecated
@Override
- public Object[] batchCallback(
- final List extends Row> actions, final Batch.Callback callback) throws IOException,
- InterruptedException {
+ public Object[] batchCallback(final List extends Row> actions,
+ final Batch.Callback callback) throws IOException, InterruptedException {
Object[] results = new Object[actions.size()];
batchCallback(actions, results, callback);
return results;
@@ -948,45 +996,44 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void delete(final Delete delete)
- throws IOException {
- RegionServerCallable callable = new RegionServerCallable(connection,
- tableName, delete.getRow()) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
-
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), delete);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ public void delete(final Delete delete) throws IOException {
+ RegionServerCallable callable =
+ new RegionServerCallable(connection, tableName, delete.getRow()) {
+ @Override
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), delete);
+ MutateResponse response = getStub().mutate(controller, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ };
+ rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public void delete(final List deletes)
- throws IOException {
+ public void delete(final List deletes) throws IOException {
Object[] results = new Object[deletes.size()];
try {
batch(deletes, results);
} catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} finally {
// mutate list so that it is empty for complete success, or contains only failed records
// results are returned in the same order as the requests in list
- // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
- for (int i = results.length - 1; i>=0; i--) {
+ // walk the list backwards, so we can remove from list without impacting the indexes of
+ // earlier members
+ for (int i = results.length - 1; i >= 0; i--) {
// if result is not null, it succeeded
if (results[i] instanceof Result) {
deletes.remove(i);
@@ -999,9 +1046,9 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void put(final Put put)
- throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- doPut(put);
+ public void put(final Put put) throws InterruptedIOException,
+ RetriesExhaustedWithDetailsException {
+ this.mutator.put(put);
if (autoFlush) {
flushCommits();
}
@@ -1011,77 +1058,11 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void put(final List puts)
- throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- for (Put put : puts) {
- doPut(put);
- }
+ public void put(final List puts) throws InterruptedIOException,
+ RetriesExhaustedWithDetailsException {
+ this.mutator.put(puts);
if (autoFlush) {
- flushCommits();
- }
- }
-
-
- /**
- * Add the put to the buffer. If the buffer is already too large, sends the buffer to the
- * cluster.
- * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
- * @throws InterruptedIOException if we were interrupted.
- */
- private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- // This behavior is highly non-intuitive... it does not protect us against
- // 94-incompatible behavior, which is a timing issue because hasError, the below code
- // and setter of hasError are not synchronized. Perhaps it should be removed.
- if (ap.hasError()) {
- writeAsyncBuffer.add(put);
- backgroundFlushCommits(true);
- }
-
- validatePut(put);
-
- currentWriteBufferSize += put.heapSize();
- writeAsyncBuffer.add(put);
-
- while (currentWriteBufferSize > writeBufferSize) {
- backgroundFlushCommits(false);
- }
- }
-
-
- /**
- * Send the operations in the buffer to the servers. Does not wait for the server's answer.
- * If the is an error (max retried reach from a previous flush or bad operation), it tries to
- * send all operations in the buffer and sends an exception.
- * @param synchronous - if true, sends all the writes and wait for all of them to finish before
- * returning.
- */
- private void backgroundFlushCommits(boolean synchronous) throws
- InterruptedIOException, RetriesExhaustedWithDetailsException {
-
- try {
- if (!synchronous) {
- ap.submit(tableName, writeAsyncBuffer, true, null, false);
- if (ap.hasError()) {
- LOG.debug(tableName + ": One or more of the operations have failed -" +
- " waiting for all operation in progress to finish (successfully or not)");
- }
- }
- if (synchronous || ap.hasError()) {
- while (!writeAsyncBuffer.isEmpty()) {
- ap.submit(tableName, writeAsyncBuffer, true, null, false);
- }
- RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
- if (error != null) {
- throw error;
- }
- }
- } finally {
- currentWriteBufferSize = 0;
- for (Row mut : writeAsyncBuffer) {
- if (mut instanceof Mutation) {
- currentWriteBufferSize += ((Mutation) mut).heapSize();
- }
- }
+ mutator.flushCommits();
}
}
@@ -1092,25 +1073,25 @@ public class HTable implements HTableInterface {
public void mutateRow(final RowMutations rm) throws IOException {
RegionServerCallable callable =
new RegionServerCallable(connection, getName(), rm.getRow()) {
- @Override
- public Void call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
- getLocation().getRegionInfo().getRegionName(), rm);
- regionMutationBuilder.setAtomic(true);
- MultiRequest request =
- MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- getStub().multi(controller, request);
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- return null;
- }
- };
- rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ @Override
+ public Void call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
+ getLocation().getRegionInfo().getRegionName(), rm);
+ regionMutationBuilder.setAtomic(true);
+ MultiRequest request =
+ MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
+ getStub().multi(controller, request);
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ return null;
+ }
+ };
+ rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@@ -1119,31 +1100,32 @@ public class HTable implements HTableInterface {
@Override
public Result append(final Append append) throws IOException {
if (append.numFamilies() == 0) {
- throw new IOException(
- "Invalid arguments to append, no columns specified");
+ throw new IOException("Invalid arguments to append, no columns specified");
}
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable callable =
- new RegionServerCallable(this.connection, getName(), append.getRow()) {
- @Override
- public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
- MutateResponse response = getStub().mutate(controller, request);
- if (!response.hasResult()) return null;
- return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ new RegionServerCallable(this.connection, getName(), append.getRow()) {
+ @Override
+ public Result call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce);
+ MutateResponse response = getStub().mutate(controller, request);
+ if (!response.hasResult()) {
+ return null;
+ }
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@@ -1152,38 +1134,36 @@ public class HTable implements HTableInterface {
@Override
public Result increment(final Increment increment) throws IOException {
if (!increment.hasFamilies()) {
- throw new IOException(
- "Invalid arguments to increment, no columns specified");
+ throw new IOException("Invalid arguments to increment, no columns specified");
}
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
- RegionServerCallable callable = new RegionServerCallable(this.connection,
- getName(), increment.getRow()) {
- @Override
- public Result call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
- MutateResponse response = getStub().mutate(controller, request);
- return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
- }
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ RegionServerCallable callable =
+ new RegionServerCallable(this.connection, getName(), increment.getRow()) {
+ @Override
+ public Result call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
+ try {
+ MutateRequest request = RequestConverter.buildMutateRequest(
+ getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce);
+ MutateResponse response = getStub().mutate(controller, request);
+ return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
+ }
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public long incrementColumnValue(final byte [] row, final byte [] family,
- final byte [] qualifier, final long amount)
- throws IOException {
+ public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
+ final long amount) throws IOException {
return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL);
}
@@ -1192,20 +1172,18 @@ public class HTable implements HTableInterface {
*/
@Deprecated
@Override
- public long incrementColumnValue(final byte [] row, final byte [] family,
- final byte [] qualifier, final long amount, final boolean writeToWAL)
- throws IOException {
+ public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
+ final long amount, final boolean writeToWAL) throws IOException {
return incrementColumnValue(row, family, qualifier, amount,
- writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT);
+ writeToWAL ? Durability.SKIP_WAL : Durability.USE_DEFAULT);
}
/**
* {@inheritDoc}
*/
@Override
- public long incrementColumnValue(final byte [] row, final byte [] family,
- final byte [] qualifier, final long amount, final Durability durability)
- throws IOException {
+ public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier,
+ final long amount, final Durability durability) throws IOException {
NullPointerException npe = null;
if (row == null) {
npe = new NullPointerException("row is null");
@@ -1215,160 +1193,187 @@ public class HTable implements HTableInterface {
npe = new NullPointerException("qualifier is null");
}
if (npe != null) {
- throw new IOException(
- "Invalid arguments to incrementColumnValue", npe);
+ throw new IOException("Invalid arguments to incrementColumnValue", npe);
}
NonceGenerator ng = this.connection.getNonceGenerator();
final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce();
RegionServerCallable callable =
- new RegionServerCallable(connection, getName(), row) {
- @Override
- public Long call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(getTableName());
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildIncrementRequest(
- getLocation().getRegionInfo().getRegionName(), row, family,
- qualifier, amount, durability, nonceGroup, nonce);
- MutateResponse response = getStub().mutate(controller, request);
- Result result =
- ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
- return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ new RegionServerCallable(connection, getName(), row) {
+ @Override
+ public Long call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(getTableName());
+ controller.setCallTimeout(callTimeout);
+ try {
+ MutateRequest request = RequestConverter.buildIncrementRequest(getLocation()
+ .getRegionInfo().getRegionName(),
+ row,
+ family,
+ qualifier,
+ amount,
+ durability,
+ nonceGroup,
+ nonce);
+ MutateResponse response = getStub().mutate(controller, request);
+ Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
+ return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public boolean checkAndPut(final byte [] row,
- final byte [] family, final byte [] qualifier, final byte [] value,
- final Put put)
- throws IOException {
+ public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
+ final byte[] value, final Put put) throws IOException {
RegionServerCallable callable =
- new RegionServerCallable(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), CompareType.EQUAL, put);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ new RegionServerCallable(connection, getName(), row) {
+ @Override
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ MutateRequest request =
+ RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
+ row,
+ family,
+ qualifier,
+ new BinaryComparator(value),
+ CompareType.EQUAL,
+ put);
+ MutateResponse response = getStub().mutate(controller, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public boolean checkAndPut(final byte [] row, final byte [] family,
- final byte [] qualifier, final CompareOp compareOp, final byte [] value,
- final Put put)
- throws IOException {
+ public boolean checkAndPut(final byte[] row,
+ final byte[] family,
+ final byte[] qualifier,
+ final CompareOp compareOp,
+ final byte[] value,
+ final Put put) throws IOException {
RegionServerCallable callable =
- new RegionServerCallable(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, put);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ new RegionServerCallable(connection, getName(), row) {
+ @Override
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = new PayloadCarryingRpcController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MutateRequest request =
+ RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
+ row,
+ family,
+ qualifier,
+ new BinaryComparator(value),
+ compareType,
+ put);
+ MutateResponse response = getStub().mutate(controller, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public boolean checkAndDelete(final byte [] row,
- final byte [] family, final byte [] qualifier, final byte [] value,
- final Delete delete)
- throws IOException {
+ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
+ final byte[] value, final Delete delete) throws IOException {
RegionServerCallable callable =
- new RegionServerCallable(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), CompareType.EQUAL, delete);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ new RegionServerCallable(connection, getName(), row) {
+ @Override
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ MutateRequest request =
+ RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
+ row,
+ family,
+ qualifier,
+ new BinaryComparator(value),
+ CompareType.EQUAL,
+ delete);
+ MutateResponse response = getStub().mutate(controller, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public boolean checkAndDelete(final byte [] row, final byte [] family,
- final byte [] qualifier, final CompareOp compareOp, final byte [] value,
- final Delete delete)
- throws IOException {
+ public boolean checkAndDelete(final byte[] row,
+ final byte[] family,
+ final byte[] qualifier,
+ final CompareOp compareOp,
+ final byte[] value,
+ final Delete delete) throws IOException {
RegionServerCallable callable =
- new RegionServerCallable(connection, getName(), row) {
- @Override
- public Boolean call(int callTimeout) throws IOException {
- PayloadCarryingRpcController controller = rpcControllerFactory.newController();
- controller.setPriority(tableName);
- controller.setCallTimeout(callTimeout);
- try {
- CompareType compareType = CompareType.valueOf(compareOp.name());
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, delete);
- MutateResponse response = getStub().mutate(controller, request);
- return Boolean.valueOf(response.getProcessed());
- } catch (ServiceException se) {
- throw ProtobufUtil.getRemoteException(se);
+ new RegionServerCallable(connection, getName(), row) {
+ @Override
+ public Boolean call(int callTimeout) throws IOException {
+ PayloadCarryingRpcController controller = rpcControllerFactory.newController();
+ controller.setPriority(tableName);
+ controller.setCallTimeout(callTimeout);
+ try {
+ CompareType compareType = CompareType.valueOf(compareOp.name());
+ MutateRequest request =
+ RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
+ row,
+ family,
+ qualifier,
+ new BinaryComparator(value),
+ compareType,
+ delete);
+ MutateResponse response = getStub().mutate(controller, request);
+ return Boolean.valueOf(response.getProcessed());
+ } catch (ServiceException se) {
+ throw ProtobufUtil.getRemoteException(se);
+ }
}
- }
- };
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ };
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
* {@inheritDoc}
*/
@Override
- public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final RowMutations rm)
- throws IOException {
+ public boolean checkAndMutate(final byte[] row,
+ final byte[] family,
+ final byte[] qualifier,
+ final CompareOp compareOp,
+ final byte[] value,
+ final RowMutations rm) throws IOException {
RegionServerCallable callable =
new RegionServerCallable(connection, getName(), row) {
@Override
@@ -1378,9 +1383,14 @@ public class HTable implements HTableInterface {
controller.setCallTimeout(callTimeout);
try {
CompareType compareType = CompareType.valueOf(compareOp.name());
- MultiRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, rm);
+ MultiRequest request =
+ RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(),
+ row,
+ family,
+ qualifier,
+ new BinaryComparator(value),
+ compareType,
+ rm);
ClientProtos.MultiResponse response = getStub().multi(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
@@ -1388,7 +1398,7 @@ public class HTable implements HTableInterface {
}
}
};
- return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout);
+ return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@@ -1407,10 +1417,14 @@ public class HTable implements HTableInterface {
*/
@Override
public boolean[] existsAll(final List gets) throws IOException {
- if (gets.isEmpty()) return new boolean[]{};
- if (gets.size() == 1) return new boolean[]{exists(gets.get(0))};
+ if (gets.isEmpty()) {
+ return new boolean[] {};
+ }
+ if (gets.size() == 1) {
+ return new boolean[] {exists(gets.get(0))};
+ }
- for (Get g: gets){
+ for (Get g : gets) {
g.setCheckExistenceOnly(true);
}
@@ -1418,7 +1432,7 @@ public class HTable implements HTableInterface {
try {
r1 = batch(gets);
} catch (InterruptedException e) {
- throw (InterruptedIOException)new InterruptedIOException().initCause(e);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
// translate.
@@ -1426,7 +1440,7 @@ public class HTable implements HTableInterface {
int i = 0;
for (Object o : r1) {
// batch ensures if there is a failure we get an exception instead
- results[i++] = ((Result)o).getExists();
+ results[i++] = ((Result) o).getExists();
}
return results;
@@ -1451,35 +1465,31 @@ public class HTable implements HTableInterface {
*/
@Override
public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
- // As we can have an operation in progress even if the buffer is empty, we call
- // backgroundFlushCommits at least one time.
- backgroundFlushCommits(true);
+ this.mutator.flushCommits();
}
/**
- * Process a mixed batch of Get, Put and Delete actions. All actions for a
- * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
+ * Process a mixed batch of Get, Put and Delete actions. All actions for a RegionServer are
+ * forwarded in one RPC call. Queries are executed in parallel.
*
* @param list The collection of actions.
- * @param results An empty array, same size as list. If an exception is thrown,
- * you can test here for partial results, and to determine which actions
- * processed successfully.
- * @throws IOException if there are problems talking to META. Per-item
- * exceptions are stored in the results array.
- */
- public void processBatchCallback(
- final List extends Row> list, final Object[] results, final Batch.Callback callback)
- throws IOException, InterruptedException {
+ * @param results An empty array, same size as list. If an exception is thrown, you can test here
+ * for partial results, and to determine which actions processed successfully.
+ * @throws IOException if there are problems talking to META. Per-item exceptions are stored in
+ * the results array.
+ */
+ public void processBatchCallback(final List extends Row> list, final Object[] results,
+ final Batch.Callback callback) throws IOException, InterruptedException {
this.batchCallback(list, results, callback);
}
/**
- * Parameterized batch processing, allowing varying return types for different
- * {@link Row} implementations.
+ * Parameterized batch processing, allowing varying return types for different {@link Row}
+ * implementations.
*/
public void processBatch(final List extends Row> list, final Object[] results)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
this.batch(list, results);
}
@@ -1511,11 +1521,6 @@ public class HTable implements HTableInterface {
}
// validate for well-formedness
- public void validatePut(final Put put) throws IllegalArgumentException {
- validatePut(put, tableConfiguration.getMaxKeyValueSize());
- }
-
- // validate for well-formedness
public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
@@ -1567,33 +1572,32 @@ public class HTable implements HTableInterface {
/**
* Returns the maximum size in bytes of the write buffer for this HTable.
*
- * The default value comes from the configuration parameter
- * {@code hbase.client.write.buffer}.
+ * The default value comes from the configuration parameter {@code hbase.client.write.buffer}.
+ *
* @return The size of the write buffer in bytes.
*/
@Override
public long getWriteBufferSize() {
- return writeBufferSize;
+ return this.mutator.getWriteBufferSize();
}
/**
* Sets the size of the buffer in bytes.
*
- * If the new size is less than the current amount of data in the
- * write buffer, the buffer gets flushed.
+ * If the new size is less than the current amount of data in the write buffer, the buffer gets
+ * flushed.
+ *
* @param writeBufferSize The new write buffer size, in bytes.
* @throws IOException if a remote or network exception occurs.
*/
@Override
public void setWriteBufferSize(long writeBufferSize) throws IOException {
- this.writeBufferSize = writeBufferSize;
- if(currentWriteBufferSize > writeBufferSize) {
- flushCommits();
- }
+ this.mutator.setWriteBufferSize(writeBufferSize);
}
/**
* The pool is used for mutli requests for this HTable
+ *
* @return the pool used for mutli
*/
ExecutorService getPool() {
@@ -1601,66 +1605,58 @@ public class HTable implements HTableInterface {
}
/**
- * Enable or disable region cache prefetch for the table. It will be
- * applied for the given table's all HTable instances who share the same
- * connection. By default, the cache prefetch is enabled.
+ * Enable or disable region cache prefetch for the table. It will be applied for the given table's
+ * all HTable instances who share the same connection. By default, the cache prefetch is enabled.
+ *
* @param tableName name of table to configure.
- * @param enable Set to true to enable region cache prefetch. Or set to
- * false to disable it.
+ * @param enable Set to true to enable region cache prefetch. Or set to false to disable it.
* @throws IOException
* @deprecated does nothing since 0.99
*/
@Deprecated
- public static void setRegionCachePrefetch(final byte[] tableName,
- final boolean enable) throws IOException {
- }
+ public static void setRegionCachePrefetch(final byte[] tableName, final boolean enable)
+ throws IOException {}
/**
* @deprecated does nothing since 0.99
*/
@Deprecated
- public static void setRegionCachePrefetch(
- final TableName tableName,
- final boolean enable) throws IOException {
- }
+ public static void setRegionCachePrefetch(final TableName tableName, final boolean enable)
+ throws IOException {}
/**
- * Enable or disable region cache prefetch for the table. It will be
- * applied for the given table's all HTable instances who share the same
- * connection. By default, the cache prefetch is enabled.
+ * Enable or disable region cache prefetch for the table. It will be applied for the given table's
+ * all HTable instances who share the same connection. By default, the cache prefetch is enabled.
+ *
* @param conf The Configuration object to use.
* @param tableName name of table to configure.
- * @param enable Set to true to enable region cache prefetch. Or set to
- * false to disable it.
+ * @param enable Set to true to enable region cache prefetch. Or set to false to disable it.
* @throws IOException
* @deprecated does nothing since 0.99
*/
@Deprecated
- public static void setRegionCachePrefetch(final Configuration conf,
- final byte[] tableName, final boolean enable) throws IOException {
- }
+ public static void setRegionCachePrefetch(final Configuration conf, final byte[] tableName,
+ final boolean enable) throws IOException {}
/**
* @deprecated does nothing since 0.99
*/
@Deprecated
- public static void setRegionCachePrefetch(final Configuration conf,
- final TableName tableName,
- final boolean enable) throws IOException {
- }
+ public static void setRegionCachePrefetch(final Configuration conf, final TableName tableName,
+ final boolean enable) throws IOException {}
/**
* Check whether region cache prefetch is enabled or not for the table.
+ *
* @param conf The Configuration object to use.
* @param tableName name of table to check
- * @return true if table's region cache prefecth is enabled. Otherwise
- * it is disabled.
+ * @return true if table's region cache prefecth is enabled. Otherwise it is disabled.
* @throws IOException
* @deprecated always return false since 0.99
*/
@Deprecated
- public static boolean getRegionCachePrefetch(final Configuration conf,
- final byte[] tableName) throws IOException {
+ public static boolean getRegionCachePrefetch(final Configuration conf, final byte[] tableName)
+ throws IOException {
return false;
}
@@ -1668,16 +1664,16 @@ public class HTable implements HTableInterface {
* @deprecated always return false since 0.99
*/
@Deprecated
- public static boolean getRegionCachePrefetch(final Configuration conf,
- final TableName tableName) throws IOException {
+ public static boolean getRegionCachePrefetch(final Configuration conf, final TableName tableName)
+ throws IOException {
return false;
}
/**
* Check whether region cache prefetch is enabled or not for the table.
+ *
* @param tableName name of table to check
- * @return true if table's region cache prefecth is enabled. Otherwise
- * it is disabled.
+ * @return true if table's region cache prefecth is enabled. Otherwise it is disabled.
* @throws IOException
* @deprecated always return false since 0.99
*/
@@ -1690,14 +1686,13 @@ public class HTable implements HTableInterface {
* @deprecated always return false since 0.99
*/
@Deprecated
- public static boolean getRegionCachePrefetch(
- final TableName tableName) throws IOException {
+ public static boolean getRegionCachePrefetch(final TableName tableName) throws IOException {
return false;
}
/**
- * Explicitly clears the region cache to fetch the latest value from META.
- * This is a power user function: avoid unless you know the ramifications.
+ * Explicitly clears the region cache to fetch the latest value from META. This is a power user
+ * function: avoid unless you know the ramifications.
*/
public void clearRegionCache() {
this.connection.clearRegionCache();
@@ -1715,11 +1710,11 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public Map coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable)
- throws ServiceException, Throwable {
- final Map results = Collections.synchronizedMap(
- new TreeMap(Bytes.BYTES_COMPARATOR));
+ public Map coprocessorService(final Class service,
+ byte[] startKey, byte[] endKey, final Batch.Call callable) throws ServiceException,
+ Throwable {
+ final Map results =
+ Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
coprocessorService(service, startKey, endKey, callable, new Batch.Callback() {
@Override
public void update(byte[] region, byte[] row, R value) {
@@ -1735,34 +1730,32 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void coprocessorService(final Class service,
- byte[] startKey, byte[] endKey, final Batch.Call callable,
- final Batch.Callback callback) throws ServiceException, Throwable {
+ public void coprocessorService(final Class service, byte[] startKey,
+ byte[] endKey, final Batch.Call callable, final Batch.Callback callback)
+ throws ServiceException, Throwable {
// get regions covered by the row range
List keys = getStartKeysInRange(startKey, endKey);
- Map> futures =
- new TreeMap>(Bytes.BYTES_COMPARATOR);
+ Map> futures = new TreeMap>(Bytes.BYTES_COMPARATOR);
for (final byte[] r : keys) {
final RegionCoprocessorRpcChannel channel =
new RegionCoprocessorRpcChannel(connection, tableName, r);
- Future future = pool.submit(
- new Callable() {
- @Override
- public R call() throws Exception {
- T instance = ProtobufUtil.newServiceStub(service, channel);
- R result = callable.call(instance);
- byte[] region = channel.getLastRegion();
- if (callback != null) {
- callback.update(region, r, result);
- }
- return result;
- }
- });
+ Future future = pool.submit(new Callable() {
+ @Override
+ public R call() throws Exception {
+ T instance = ProtobufUtil.newServiceStub(service, channel);
+ R result = callable.call(instance);
+ byte[] region = channel.getLastRegion();
+ if (callback != null) {
+ callback.update(region, r, result);
+ }
+ return result;
+ }
+ });
futures.put(r, future);
}
- for (Map.Entry> e : futures.entrySet()) {
+ for (Map.Entry> e : futures.entrySet()) {
try {
e.getValue().get();
} catch (ExecutionException ee) {
@@ -1770,15 +1763,13 @@ public class HTable implements HTableInterface {
+ Bytes.toStringBinary(e.getKey()), ee);
throw ee.getCause();
} catch (InterruptedException ie) {
- throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
- + " for row " + Bytes.toStringBinary(e.getKey()))
- .initCause(ie);
+ throw new InterruptedIOException("Interrupted calling coprocessor service "
+ + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie);
}
}
}
- private List getStartKeysInRange(byte[] start, byte[] end)
- throws IOException {
+ private List getStartKeysInRange(byte[] start, byte[] end) throws IOException {
if (start == null) {
start = HConstants.EMPTY_START_ROW;
}
@@ -1806,11 +1797,15 @@ public class HTable implements HTableInterface {
*/
@Override
public Map batchCoprocessorService(
- Descriptors.MethodDescriptor methodDescriptor, Message request,
- byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
- final Map results = Collections.synchronizedMap(new TreeMap(
- Bytes.BYTES_COMPARATOR));
- batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
+ Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey,
+ byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
+ final Map results =
+ Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));
+ batchCoprocessorService(methodDescriptor,
+ request,
+ startKey,
+ endKey,
+ responsePrototype,
new Callback() {
@Override
@@ -1827,10 +1822,13 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
- public void batchCoprocessorService(
- final Descriptors.MethodDescriptor methodDescriptor, final Message request,
- byte[] startKey, byte[] endKey, final R responsePrototype, final Callback callback)
- throws ServiceException, Throwable {
+ public <
+ R extends Message> void batchCoprocessorService(final Descriptors.MethodDescriptor methodDescriptor,
+ final Message request,
+ byte[] startKey,
+ byte[] endKey,
+ final R responsePrototype,
+ final Callback callback) throws ServiceException, Throwable {
// get regions covered by the row range
Pair, List> keysAndRegions =
@@ -1840,8 +1838,8 @@ public class HTable implements HTableInterface {
// check if we have any calls to make
if (keys.isEmpty()) {
- LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) +
- ", end=" + Bytes.toStringBinary(endKey));
+ LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey)
+ + ", end=" + Bytes.toStringBinary(endKey));
return;
}
@@ -1864,26 +1862,26 @@ public class HTable implements HTableInterface {
final List callbackErrorServers = new ArrayList();
Object[] results = new Object[execs.size()];
- AsyncProcess asyncProcess =
- new AsyncProcess(connection, configuration, pool,
- RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
- true, RpcControllerFactory.instantiate(configuration));
+ AsyncProcess asyncProcess = new AsyncProcess(connection,
+ configuration,
+ pool,
+ RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
+ true,
+ RpcControllerFactory.instantiate(configuration));
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
new Callback() {
@Override
public void update(byte[] region, byte[] row,
- ClientProtos.CoprocessorServiceResult serviceResult) {
+ ClientProtos.CoprocessorServiceResult serviceResult) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() +
- ": region=" + Bytes.toStringBinary(region) +
- ", row=" + Bytes.toStringBinary(row) +
- ", value=" + serviceResult.getValue().getValue());
+ LOG.trace("Received result for endpoint " + methodDescriptor.getFullName()
+ + ": region=" + Bytes.toStringBinary(region) + ", row="
+ + Bytes.toStringBinary(row) + ", value=" + serviceResult.getValue().getValue());
}
try {
- callback.update(region, row,
- (R) responsePrototype.newBuilderForType().mergeFrom(
- serviceResult.getValue().getValue()).build());
+ callback.update(region, row, (R) responsePrototype.newBuilderForType()
+ .mergeFrom(serviceResult.getValue().getValue()).build());
} catch (InvalidProtocolBufferException e) {
LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(),
e);
diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java
new file mode 100644
index 0000000..1c51887
--- /dev/null
+++ hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java
@@ -0,0 +1,57 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.hbase.client.HBulkMutator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * An implementation of {@link Lock} that doesn't actually lock anything. {@link HBulkMutator} uses
+ * a Lock, but there are cases
+ *
+ */
+public class DoNothingLock implements Lock {
+
+ @Override
+ public void lock() {
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ }
+
+ @Override
+ public boolean tryLock() {
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ }
+
+ @Override
+ public Condition newCondition() {
+ return null;
+ }
+}
diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 88a95fb..1fffd86 100644
--- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -155,8 +155,8 @@ public class TestAsyncProcess {
new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
}
- public MyAsyncProcess(
- ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) {
+ public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
+ @SuppressWarnings("unused") boolean dummy) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) {
@Override
@@ -649,7 +649,7 @@ public class TestAsyncProcess {
@Test
public void testHTablePutSuccess() throws Exception {
- HTable ht = Mockito.mock(HTable.class);
+ HBulkMutator ht = Mockito.mock(HBulkMutator.class);
ht.ap = new MyAsyncProcess(createHConnection(), conf, true);
Put put = createPut(1, true);
@@ -662,7 +662,7 @@ public class TestAsyncProcess {
private void doHTableFailedPut(boolean bufferOn) throws Exception {
HTable ht = new HTable();
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
- ht.ap = ap;
+ ht.mutator.ap = ap;
ht.setAutoFlushTo(true);
if (bufferOn) {
ht.setWriteBufferSize(1024L * 1024L);
@@ -672,7 +672,7 @@ public class TestAsyncProcess {
Put put = createPut(1, false);
- Assert.assertEquals(0L, ht.currentWriteBufferSize);
+ Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
try {
ht.put(put);
if (bufferOn) {
@@ -681,7 +681,7 @@ public class TestAsyncProcess {
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals(0L, ht.currentWriteBufferSize);
+ Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize);
// The table should have sent one request, maybe after multiple attempts
AsyncRequestFuture ars = null;
for (AsyncRequestFuture someReqs : ap.allReqs) {
@@ -710,7 +710,7 @@ public class TestAsyncProcess {
public void testHTableFailedPutAndNewPut() throws Exception {
HTable ht = new HTable();
MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true);
- ht.ap = ap;
+ ht.mutator.ap = ap;
ht.setAutoFlushTo(false);
ht.setWriteBufferSize(0);
@@ -725,13 +725,15 @@ public class TestAsyncProcess {
// puts, we may raise an exception in the middle of the list. It's then up to the caller to
// manage what was inserted, what was tried but failed, and what was not even tried.
p = createPut(1, true);
- Assert.assertEquals(0, ht.writeAsyncBuffer.size());
+ Assert.assertEquals(0, ht.mutator.getWriteBuffer().size());
try {
ht.put(p);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
- Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size());
+ Assert.assertEquals("the put should not been inserted.", 0, ht.mutator.getWriteBuffer().size());
+
+ ht.close();
}
@@ -801,11 +803,11 @@ public class TestAsyncProcess {
ht.connection = new MyConnectionImpl(configuration);
MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true);
- ht.ap = ap;
+ ht.mutator.ap = ap;
- Assert.assertNotNull(ht.ap.createServerErrorTracker());
- Assert.assertTrue(ht.ap.serverTrackerTimeout > 200);
- ht.ap.serverTrackerTimeout = 1;
+ Assert.assertNotNull(ht.mutator.ap.createServerErrorTracker());
+ Assert.assertTrue(ht.mutator.ap.serverTrackerTimeout > 200);
+ ht.mutator.ap.serverTrackerTimeout = 1;
Put p = createPut(1, false);
ht.setAutoFlushTo(false);
@@ -818,6 +820,7 @@ public class TestAsyncProcess {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+ ht.close();
}
@Test
@@ -825,9 +828,9 @@ public class TestAsyncProcess {
HTable ht = new HTable();
ht.connection = new MyConnectionImpl(conf);
AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf);
- ht.ap = ap;
+ ht.mutator.ap = ap;
- Assert.assertNotNull(ht.ap.createServerErrorTracker());
+ Assert.assertNotNull(ht.mutator.ap.createServerErrorTracker());
Put p = createPut(1, true);
ht.setAutoFlushTo(false);
@@ -840,6 +843,7 @@ public class TestAsyncProcess {
}
// Checking that the ErrorsServers came into play and didn't make us stop immediately
Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get());
+ ht.close();
}
/**
@@ -867,7 +871,7 @@ public class TestAsyncProcess {
MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads);
ht.multiAp = ap;
- ht.batch(gets);
+ ht.batch(gets, new Object[gets.size()]);
Assert.assertEquals(ap.nbActions.get(), NB_REGS);
Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());