diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutator.java new file mode 100644 index 0000000..a870d43 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutator.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Used to communicate with a single HBase table similar to {@Table} but meant for bulk writes. + * Obtain an instance from a {@link Connection} and call {@link #close()} afterwards. + * + *

Table can be used to get, put, delete or scan data from a table. + * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface BulkMutator extends Closeable { + /** + * Gets the fully qualified table name instance of the table that this BulkMutator writes to. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will + * affect this instance. + */ + Configuration getConfiguration(); + + /** + * Puts some data in the table. The puts will be buffered and sent over the wire as part of a + * batch. + * + * @param put The data to put. + * @throws IOException if a remote or network exception occurs. + */ + void put(Put put) throws IOException; + + + /** + * Puts some data in the table. The puts will be buffered and sent over the wire as part of a + * batch. + * + * @param put The data to put. + * @throws IOException if a remote or network exception occurs. + */ + void put(List puts) throws IOException; + + /** + * Releases any resources held or pending changes in internal buffers. + * + * @throws IOException if a remote or network exception occurs. + */ + @Override + void close() throws IOException; + + /** + * Executes all the buffered {@link Put} operations. + *

+ * This method gets called once automatically for every {@link Put} or batch + * of {@link Put}s (when put(List) is used) when + * @throws IOException if a remote or network exception occurs. + */ + void flushCommits() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter + * {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + long getWriteBufferSize(); + + /** + * Sets the size of the buffer in bytes. + *

+ * If the new size is less than the current amount of data in the + * write buffer, the buffer gets flushed. + * @param writeBufferSize The new write buffer size, in bytes. + * @throws IOException if a remote or network exception occurs. + */ + void setWriteBufferSize(long writeBufferSize) throws IOException; +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutatorExceptionListener.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutatorExceptionListener.java new file mode 100644 index 0000000..325ce7f --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BulkMutatorExceptionListener.java @@ -0,0 +1,24 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Listens for asynchronous exceptions on a {@link BulkMutator}. + */ +public interface BulkMutatorExceptionListener { + public void onException(RetriesExhaustedWithDetailsException exception, HBulkMutator hBulkMutator) + throws RetriesExhaustedWithDetailsException; +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBulkMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBulkMutator.java new file mode 100644 index 0000000..0c50709 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBulkMutator.java @@ -0,0 +1,279 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.DoNothingLock; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +/** + * Used to communicate with a single HBase table similar to {@HTable} but meant for bulk + * writes. Obtain an instance from a {@link Connection} and call {@link #close()} afterwards. + * + *

Table can be used to get, put, delete or scan data from a table. + * + * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +public class HBulkMutator implements BulkMutator { + + private static final Log LOG = LogFactory.getLog(HBulkMutator.class); + + private BulkMutatorExceptionListener listener; + + protected final Connection connection; + private final TableName tableName; + private volatile Configuration configuration; + private TableConfiguration tableConfiguration; + private List writeAsyncBuffer = new LinkedList(); + private long writeBufferSize; + protected long currentWriteBufferSize = 0; + private boolean closed = false; + private ExecutorService pool; + protected AsyncProcess ap; + private Lock lock; + + + public HBulkMutator(TableName tableName, + Connection connection, + ExecutorService pool, + RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, + TableConfiguration tableConfig, + BulkMutatorExceptionListener listener, + Lock lock) { + if (connection == null || connection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + this.tableName = tableName; + this.connection = connection; + this.configuration = connection.getConfiguration(); + this.pool = pool; + this.tableConfiguration = tableConfig; + this.listener = listener; + this.lock = lock; + + this.writeBufferSize = tableConfiguration.getWriteBufferSize(); + + // puts need to track errors globally due to how the APIs currently work. + ap = new AsyncProcess((ClusterConnection) connection, + configuration, + pool, + rpcCallerFactory, + true, + rpcControllerFactory); + } + + @VisibleForTesting + HBulkMutator(TableConfiguration tableConfiguration) { + this.tableConfiguration = tableConfiguration; + connection = null; + tableName = null; + lock = new DoNothingLock(); + } + + /** + * {@inheritDoc} + */ + @Override + public TableName getName() { + return tableName; + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getConfiguration() { + return configuration; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void put(Put put) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + lock.lock(); + try { + doPut(put); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void put(List puts) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + lock.lock(); + try { + for (Put put : puts) { + doPut(put); + } + } finally { + lock.unlock(); + } + } + + /** + * Add the put to the buffer. If the buffer is already too large, sends the buffer to the cluster. + * + * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. + * @throws InterruptedIOException if we were interrupted. + */ + private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + if (closed) { + throw new IllegalStateException("Cannot put when the BulkMutator is closed."); + } + + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.add(put); + backgroundFlushCommits(true); + } + + validatePut(put); + + currentWriteBufferSize += put.heapSize(); + writeAsyncBuffer.add(put); + + while (currentWriteBufferSize > writeBufferSize) { + backgroundFlushCommits(false); + } + } + + // validate for well-formedness + public void validatePut(final Put put) throws IllegalArgumentException { + HTable.validatePut(put, tableConfiguration.getMaxKeyValueSize()); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + if (this.closed) { + return; + } + flushCommits(); + this.pool.shutdown(); + try { + boolean terminated = false; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + } while (!terminated); + } catch (InterruptedException e) { + LOG.warn("waitForTermination interrupted"); + } + this.closed = true; + } + + /** + * {@inheritDoc} + */ + @Override + public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { + lock.lock(); + try { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + } finally { + lock.unlock(); + } + } + + /** + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If the + * is an error (max retried reach from a previous flush or bad operation), it tries to send all + * operations in the buffer and sends an exception. + * + * @param synchronous - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + try { + if (!synchronous) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + if (ap.hasError()) { + LOG.debug(tableName + ": One or more of the operations have failed -" + + " waiting for all operation in progress to finish (successfully or not)"); + } + } + if (synchronous || ap.hasError()) { + while (!writeAsyncBuffer.isEmpty()) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + } + RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + if (error != null) { + this.listener.onException(error, this); + } + } + } finally { + currentWriteBufferSize = 0; + for (Row mut : writeAsyncBuffer) { + if (mut instanceof Mutation) { + currentWriteBufferSize += ((Mutation) mut).heapSize(); + } + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, + InterruptedIOException { + this.writeBufferSize = writeBufferSize; + if (currentWriteBufferSize > writeBufferSize) { + flushCommits(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getWriteBufferSize() { + return this.writeBufferSize; + } + + public List getWriteBuffer() { + return this.writeAsyncBuffer; + } +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 68d3f9f..e9e44a3 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1,39 +1,26 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.hadoop.hbase.client; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.Descriptors; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -70,34 +57,41 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DoNothingLock; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.Descriptors; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** - * An implementation of {@link Table}. Used to communicate with a single HBase table. - * Lightweight. Get as needed and just close when done. - * Instances of this class SHOULD NOT be constructed directly. - * Obtain an instance via {@link Connection}. See {@link ConnectionFactory} - * class comment for an example of how. + * An implementation of {@link Table}. Used to communicate with a single HBase table. Lightweight. + * Get as needed and just close when done. Instances of this class SHOULD NOT be constructed + * directly. Obtain an instance via {@link Connection}. See {@link ConnectionFactory} class comment + * for an example of how. * - *

This class is NOT thread safe for reads nor writes. - * In the case of writes (Put, Delete), the underlying write buffer can - * be corrupted if multiple threads contend over a single HTable instance. - * In the case of reads, some fields used by a Scan are shared among all threads. + *

This class is NOT thread safe for reads nor writes. In the case of writes (Put, Delete), the + * underlying write buffer can be corrupted if multiple threads contend over a single HTable + * instance. In the case of reads, some fields used by a Scan are shared among all threads. * *

HTable is no longer a client API. Use {@link Table} instead. It is marked - * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in - * Hadoop - * Interface Classification - * There are no guarantees for backwards source / binary compatibility and methods or class can - * change or go away without deprecation. + * InterfaceAudience.Private indicating that this is an HBase-internal class as defined in Hadoop + * Interface Classification There are no guarantees for backwards source / binary compatibility + * and methods or class can change or go away without deprecation. * * @see Table * @see Admin @@ -112,65 +106,62 @@ public class HTable implements HTableInterface { private final TableName tableName; private volatile Configuration configuration; private TableConfiguration tableConfiguration; - protected List writeAsyncBuffer = new LinkedList(); - private long writeBufferSize; private boolean autoFlush = true; - protected long currentWriteBufferSize = 0 ; private boolean closed = false; protected int scannerCaching; - private ExecutorService pool; // For Multi & Scan + private ExecutorService pool; // For Multi & Scan private int operationTimeout; private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; private HRegionLocator locator; - /** The Async process for puts with autoflush set to false or multiputs */ - protected AsyncProcess ap; /** The Async process for batch */ protected AsyncProcess multiAp; private RpcRetryingCallerFactory rpcCallerFactory; private RpcControllerFactory rpcControllerFactory; + protected HBulkMutator mutator; + /** * Creates an object to access a HBase table. + * * @param conf Configuration object to use. * @param tableName Name of the table. * @throws IOException if a remote or network exception occurs * @deprecated Constructing HTable objects manually has been deprecated. Please use - * {@link Connection} to instantiate a {@link Table} instead. + * {@link Connection} to instantiate a {@link Table} instead. */ @Deprecated - public HTable(Configuration conf, final String tableName) - throws IOException { + public HTable(Configuration conf, final String tableName) throws IOException { this(conf, TableName.valueOf(tableName)); } /** * Creates an object to access a HBase table. + * * @param conf Configuration object to use. * @param tableName Name of the table. * @throws IOException if a remote or network exception occurs * @deprecated Constructing HTable objects manually has been deprecated. Please use - * {@link Connection} to instantiate a {@link Table} instead. + * {@link Connection} to instantiate a {@link Table} instead. */ @Deprecated - public HTable(Configuration conf, final byte[] tableName) - throws IOException { + public HTable(Configuration conf, final byte[] tableName) throws IOException { this(conf, TableName.valueOf(tableName)); } /** * Creates an object to access a HBase table. + * * @param conf Configuration object to use. * @param tableName table name pojo * @throws IOException if a remote or network exception occurs * @deprecated Constructing HTable objects manually has been deprecated. Please use - * {@link Connection} to instantiate a {@link Table} instead. + * {@link Connection} to instantiate a {@link Table} instead. */ @Deprecated - public HTable(Configuration conf, final TableName tableName) - throws IOException { + public HTable(Configuration conf, final TableName tableName) throws IOException { this.tableName = tableName; this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true; if (conf == null) { @@ -186,6 +177,7 @@ public class HTable implements HTableInterface { /** * Creates an object to access a HBase table. + * * @param tableName Name of the table. * @param connection HConnection to be used. * @throws IOException if a remote or network exception occurs @@ -196,7 +188,7 @@ public class HTable implements HTableInterface { this.tableName = tableName; this.cleanupPoolOnClose = true; this.cleanupConnectionOnClose = false; - this.connection = (ClusterConnection)connection; + this.connection = (ClusterConnection) connection; this.configuration = connection.getConfiguration(); this.pool = getDefaultExecutor(this.configuration); @@ -216,20 +208,25 @@ public class HTable implements HTableInterface { // if it is necessary and will grow unbounded. This could be bad but in HCM // we only create as many Runnables as there are region servers. It means // it also scales when new region servers are added. - ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, - new SynchronousQueue(), Threads.newDaemonThreadFactory("htable")); - ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true); + ThreadPoolExecutor pool = new ThreadPoolExecutor(1, + maxThreads, + keepAliveTime, + TimeUnit.SECONDS, + new SynchronousQueue(), + Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); return pool; } /** * Creates an object to access a HBase table. + * * @param conf Configuration object to use. * @param tableName Name of the table. * @param pool ExecutorService to be used. * @throws IOException if a remote or network exception occurs * @deprecated Constructing HTable objects manually has been deprecated. Please use - * {@link Connection} to instantiate a {@link Table} instead. + * {@link Connection} to instantiate a {@link Table} instead. */ @Deprecated public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool) @@ -239,12 +236,13 @@ public class HTable implements HTableInterface { /** * Creates an object to access a HBase table. + * * @param conf Configuration object to use. * @param tableName Name of the table. * @param pool ExecutorService to be used. * @throws IOException if a remote or network exception occurs * @deprecated Constructing HTable objects manually has been deprecated. Please use - * {@link Connection} to instantiate a {@link Table} instead. + * {@link Connection} to instantiate a {@link Table} instead. */ @Deprecated public HTable(Configuration conf, final TableName tableName, final ExecutorService pool) @@ -265,6 +263,7 @@ public class HTable implements HTableInterface { /** * Creates an object to access a HBase table. + * * @param tableName Name of the table. * @param connection HConnection to be used. * @param pool ExecutorService to be used. @@ -272,29 +271,31 @@ public class HTable implements HTableInterface { * @deprecated Do not use, internal ctor. */ @Deprecated - public HTable(final byte[] tableName, final Connection connection, - final ExecutorService pool) throws IOException { + public HTable(final byte[] tableName, final Connection connection, final ExecutorService pool) + throws IOException { this(TableName.valueOf(tableName), connection, pool); } /** @deprecated Do not use, internal ctor. */ @Deprecated - public HTable(TableName tableName, final Connection connection, - final ExecutorService pool) throws IOException { - this(tableName, (ClusterConnection)connection, null, null, null, pool); + public HTable(TableName tableName, final Connection connection, final ExecutorService pool) + throws IOException { + this(tableName, (ClusterConnection) connection, null, null, null, pool); } /** - * Creates an object to access a HBase table. - * Used by HBase internally. DO NOT USE. See {@link ConnectionFactory} class comment for how to - * get a {@link Table} instance (use {@link Table} instead of {@link HTable}). + * Creates an object to access a HBase table. Used by HBase internally. DO NOT USE. See + * {@link ConnectionFactory} class comment for how to get a {@link Table} instance (use + * {@link Table} instead of {@link HTable}). + * * @param tableName Name of the table. * @param connection HConnection to be used. * @param pool ExecutorService to be used. * @throws IOException if a remote or network exception occurs */ @InterfaceAudience.Private - public HTable(TableName tableName, final ClusterConnection connection, + public HTable(TableName tableName, + final ClusterConnection connection, final TableConfiguration tableConfig, final RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, @@ -323,6 +324,7 @@ public class HTable implements HTableInterface { /** * For internal testing. + * * @throws IOException */ @VisibleForTesting @@ -331,6 +333,7 @@ public class HTable implements HTableInterface { tableConfiguration = new TableConfiguration(); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; + this.mutator = new HBulkMutator(tableConfiguration); } /** @@ -343,14 +346,13 @@ public class HTable implements HTableInterface { /** * setup this HTable's parameter based on the passed configuration */ - private void finishSetup() throws IOException { + private void finishSetup() { if (tableConfiguration == null) { tableConfiguration = new TableConfiguration(configuration); } this.operationTimeout = tableName.isSystemTable() ? tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.writeBufferSize = tableConfiguration.getWriteBufferSize(); this.scannerCaching = tableConfiguration.getScannerCaching(); if (this.rpcCallerFactory == null) { @@ -361,9 +363,22 @@ public class HTable implements HTableInterface { } // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); - multiAp = this.connection.getAsyncProcess(); this.locator = new HRegionLocator(getName(), connection); + BulkMutatorExceptionListener listener = new BulkMutatorExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException exception, + HBulkMutator hBulkMutator) throws RetriesExhaustedWithDetailsException { + throw exception; + } + }; + this.mutator = new HBulkMutator(tableName, + connection, + pool, + rpcCallerFactory, + rpcControllerFactory, + tableConfiguration, + listener, + new DoNothingLock()); } /** @@ -375,9 +390,9 @@ public class HTable implements HTableInterface { } /** - * Tells whether or not a table is enabled or not. This method creates a - * new HBase configuration, so it might make your unit tests fail due to - * incorrect ZK client port. + * Tells whether or not a table is enabled or not. This method creates a new HBase configuration, + * so it might make your unit tests fail due to incorrect ZK client port. + * * @param tableName Name of table to check. * @return {@code true} if table is online. * @throws IOException if a remote or network exception occurs @@ -389,9 +404,9 @@ public class HTable implements HTableInterface { } /** - * Tells whether or not a table is enabled or not. This method creates a - * new HBase configuration, so it might make your unit tests fail due to - * incorrect ZK client port. + * Tells whether or not a table is enabled or not. This method creates a new HBase configuration, + * so it might make your unit tests fail due to incorrect ZK client port. + * * @param tableName Name of table to check. * @return {@code true} if table is online. * @throws IOException if a remote or network exception occurs @@ -403,9 +418,9 @@ public class HTable implements HTableInterface { } /** - * Tells whether or not a table is enabled or not. This method creates a - * new HBase configuration, so it might make your unit tests fail due to - * incorrect ZK client port. + * Tells whether or not a table is enabled or not. This method creates a new HBase configuration, + * so it might make your unit tests fail due to incorrect ZK client port. + * * @param tableName Name of table to check. * @return {@code true} if table is online. * @throws IOException if a remote or network exception occurs @@ -418,6 +433,7 @@ public class HTable implements HTableInterface { /** * Tells whether or not a table is enabled or not. + * * @param conf The Configuration object to use. * @param tableName Name of table to check. * @return {@code true} if table is online. @@ -425,13 +441,13 @@ public class HTable implements HTableInterface { * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])} */ @Deprecated - public static boolean isTableEnabled(Configuration conf, String tableName) - throws IOException { + public static boolean isTableEnabled(Configuration conf, String tableName) throws IOException { return isTableEnabled(conf, TableName.valueOf(tableName)); } /** * Tells whether or not a table is enabled or not. + * * @param conf The Configuration object to use. * @param tableName Name of table to check. * @return {@code true} if table is online. @@ -439,13 +455,13 @@ public class HTable implements HTableInterface { * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])} */ @Deprecated - public static boolean isTableEnabled(Configuration conf, byte[] tableName) - throws IOException { + public static boolean isTableEnabled(Configuration conf, byte[] tableName) throws IOException { return isTableEnabled(conf, TableName.valueOf(tableName)); } /** * Tells whether or not a table is enabled or not. + * * @param conf The Configuration object to use. * @param tableName Name of table to check. * @return {@code true} if table is online. @@ -453,8 +469,8 @@ public class HTable implements HTableInterface { * @deprecated use {@link HBaseAdmin#isTableEnabled(org.apache.hadoop.hbase.TableName tableName)} */ @Deprecated - public static boolean isTableEnabled(Configuration conf, - final TableName tableName) throws IOException { + public static boolean isTableEnabled(Configuration conf, final TableName tableName) + throws IOException { return HConnectionManager.execute(new HConnectable(conf) { @Override public Boolean connect(HConnection connection) throws IOException { @@ -465,14 +481,14 @@ public class HTable implements HTableInterface { /** * Find region location hosting passed row using cached info + * * @param row Row to find. * @return The location of the given row. * @throws IOException if a remote or network exception occurs * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} */ @Deprecated - public HRegionLocation getRegionLocation(final String row) - throws IOException { + public HRegionLocation getRegionLocation(final String row) throws IOException { return getRegionLocation(Bytes.toBytes(row), false); } @@ -480,8 +496,7 @@ public class HTable implements HTableInterface { * @deprecated Use {@link RegionLocator#getRegionLocation(byte[])} instead. */ @Deprecated - public HRegionLocation getRegionLocation(final byte [] row) - throws IOException { + public HRegionLocation getRegionLocation(final byte[] row) throws IOException { return locator.getRegionLocation(row); } @@ -489,16 +504,18 @@ public class HTable implements HTableInterface { * @deprecated Use {@link RegionLocator#getRegionLocation(byte[], boolean)} instead. */ @Deprecated - public HRegionLocation getRegionLocation(final byte [] row, boolean reload) - throws IOException { + public HRegionLocation getRegionLocation(final byte[] row, boolean reload) throws IOException { return locator.getRegionLocation(row, reload); } /** * {@inheritDoc} + * + * @deprecated */ + @Deprecated @Override - public byte [] getTableName() { + public byte[] getTableName() { return this.tableName.getName(); } @@ -508,12 +525,12 @@ public class HTable implements HTableInterface { } /** - * INTERNAL Used by unit tests and tools to do low-level - * manipulations. + * INTERNAL Used by unit tests and tools to do low-level manipulations. + * * @return An HConnection instance. * @deprecated This method will be changed from public to package protected. */ - // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers. + // TODO(tsuna): Remove this. Unit tests shouldn't require public helpers. @Deprecated @VisibleForTesting public HConnection getConnection() { @@ -524,6 +541,7 @@ public class HTable implements HTableInterface { * Gets the number of rows that a scanner will fetch at once. *

* The default value comes from {@code hbase.client.scanner.caching}. + * * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()} */ @Deprecated @@ -533,21 +551,22 @@ public class HTable implements HTableInterface { /** * Kept in 0.96 for backward compatibility - * @deprecated since 0.96. This is an internal buffer that should not be read nor write. + * + * @deprecated since 0.96. This is an internal buffer that should not be read nor write. */ @Deprecated public List getWriteBuffer() { - return writeAsyncBuffer; + return mutator.getWriteBuffer(); } /** * Sets the number of rows that a scanner will fetch at once. *

- * This will override the value specified by - * {@code hbase.client.scanner.caching}. - * Increasing this value will reduce the amount of work needed each time - * {@code next()} is called on a scanner, at the expense of memory use - * (since more rows will need to be maintained in memory by the scanners). + * This will override the value specified by {@code hbase.client.scanner.caching}. Increasing this + * value will reduce the amount of work needed each time {@code next()} is called on a scanner, at + * the expense of memory use (since more rows will need to be maintained in memory by the + * scanners). + * * @param scannerCaching the number of rows a scanner will fetch at once. * @deprecated Use {@link Scan#setCaching(int)} */ @@ -562,25 +581,27 @@ public class HTable implements HTableInterface { @Override public HTableDescriptor getTableDescriptor() throws IOException { // TODO: This is the same as HBaseAdmin.getTableDescriptor(). Only keep one. - if (tableName == null) return null; + if (tableName == null) { + return null; + } if (tableName.equals(TableName.META_TABLE_NAME)) { return HTableDescriptor.META_TABLEDESC; } - HTableDescriptor htd = executeMasterCallable( - new MasterCallable(getConnection()) { - @Override - public HTableDescriptor call(int callTimeout) throws ServiceException { - GetTableDescriptorsResponse htds; - GetTableDescriptorsRequest req = - RequestConverter.buildGetTableDescriptorsRequest(tableName); - htds = master.getTableDescriptors(null, req); - - if (!htds.getTableSchemaList().isEmpty()) { - return HTableDescriptor.convert(htds.getTableSchemaList().get(0)); - } - return null; - } - }); + HTableDescriptor htd = + executeMasterCallable(new MasterCallable(getConnection()) { + @Override + public HTableDescriptor call(int callTimeout) throws ServiceException { + GetTableDescriptorsResponse htds; + GetTableDescriptorsRequest req = + RequestConverter.buildGetTableDescriptorsRequest(tableName); + htds = master.getTableDescriptors(null, req); + + if (!htds.getTableSchemaList().isEmpty()) { + return HTableDescriptor.convert(htds.getTableSchemaList().get(0)); + } + return null; + } + }); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -600,7 +621,7 @@ public class HTable implements HTableInterface { * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; */ @Deprecated - public byte [][] getStartKeys() throws IOException { + public byte[][] getStartKeys() throws IOException { return locator.getStartKeys(); } @@ -616,7 +637,7 @@ public class HTable implements HTableInterface { * @deprecated Use {@link RegionLocator#getStartEndKeys()} instead; */ @Deprecated - public Pair getStartEndKeys() throws IOException { + public Pair getStartEndKeys() throws IOException { return locator.getStartEndKeys(); } @@ -624,13 +645,15 @@ public class HTable implements HTableInterface { * Gets all the regions and their address for this table. *

* This is mainly useful for the MapReduce integration. + * * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs - * @deprecated This is no longer a public API. Use {@link #getAllRegionLocations()} instead. + * @deprecated This is no longer a public API. Use {@link #getAllRegionLocations()} instead. */ @Deprecated public NavigableMap getRegionLocations() throws IOException { - // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an HRegionLocation. + // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocator, singular, returns an + // HRegionLocation. return MetaScanner.allTableRegions(this.connection, getName()); } @@ -638,9 +661,10 @@ public class HTable implements HTableInterface { * Gets all the regions and their address for this table. *

* This is mainly useful for the MapReduce integration. + * * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs - * + * * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ @Deprecated @@ -651,77 +675,76 @@ public class HTable implements HTableInterface { /** * Get the corresponding regions for an arbitrary range of keys. *

+ * * @param startKey Starting row in range, inclusive * @param endKey Ending row in range, exclusive - * @return A list of HRegionLocations corresponding to the regions that - * contain the specified range + * @return A list of HRegionLocations corresponding to the regions that contain the specified + * range * @throws IOException if a remote or network exception occurs * @deprecated This is no longer a public API */ @Deprecated - public List getRegionsInRange(final byte [] startKey, - final byte [] endKey) throws IOException { + public List getRegionsInRange(final byte[] startKey, final byte[] endKey) + throws IOException { return getRegionsInRange(startKey, endKey, false); } /** * Get the corresponding regions for an arbitrary range of keys. *

+ * * @param startKey Starting row in range, inclusive * @param endKey Ending row in range, exclusive * @param reload true to reload information or false to use cached information - * @return A list of HRegionLocations corresponding to the regions that - * contain the specified range + * @return A list of HRegionLocations corresponding to the regions that contain the specified + * range * @throws IOException if a remote or network exception occurs * @deprecated This is no longer a public API */ @Deprecated - public List getRegionsInRange(final byte [] startKey, - final byte [] endKey, final boolean reload) throws IOException { + public List getRegionsInRange(final byte[] startKey, final byte[] endKey, + final boolean reload) throws IOException { return getKeysAndRegionsInRange(startKey, endKey, false, reload).getSecond(); } /** - * Get the corresponding start keys and regions for an arbitrary range of - * keys. + * Get the corresponding start keys and regions for an arbitrary range of keys. *

+ * * @param startKey Starting row in range, inclusive * @param endKey Ending row in range * @param includeEndKey true if endRow is inclusive, false if exclusive - * @return A pair of list of start keys and list of HRegionLocations that - * contain the specified range + * @return A pair of list of start keys and list of HRegionLocations that contain the specified + * range * @throws IOException if a remote or network exception occurs * @deprecated This is no longer a public API */ @Deprecated - private Pair, List> getKeysAndRegionsInRange( - final byte[] startKey, final byte[] endKey, final boolean includeEndKey) - throws IOException { + private Pair, List> getKeysAndRegionsInRange(final byte[] startKey, + final byte[] endKey, final boolean includeEndKey) throws IOException { return getKeysAndRegionsInRange(startKey, endKey, includeEndKey, false); } /** - * Get the corresponding start keys and regions for an arbitrary range of - * keys. + * Get the corresponding start keys and regions for an arbitrary range of keys. *

+ * * @param startKey Starting row in range, inclusive * @param endKey Ending row in range * @param includeEndKey true if endRow is inclusive, false if exclusive * @param reload true to reload information or false to use cached information - * @return A pair of list of start keys and list of HRegionLocations that - * contain the specified range + * @return A pair of list of start keys and list of HRegionLocations that contain the specified + * range * @throws IOException if a remote or network exception occurs * @deprecated This is no longer a public API */ @Deprecated - private Pair, List> getKeysAndRegionsInRange( - final byte[] startKey, final byte[] endKey, final boolean includeEndKey, - final boolean reload) throws IOException { - final boolean endKeyIsEndOfTable = Bytes.equals(endKey,HConstants.EMPTY_END_ROW); + private Pair, List> getKeysAndRegionsInRange(final byte[] startKey, + final byte[] endKey, final boolean includeEndKey, final boolean reload) throws IOException { + final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { - throw new IllegalArgumentException( - "Invalid range: " + Bytes.toStringBinary(startKey) + - " > " + Bytes.toStringBinary(endKey)); + throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(startKey) + " > " + + Bytes.toStringBinary(endKey)); } List keysInRange = new ArrayList(); List regionsInRange = new ArrayList(); @@ -731,45 +754,46 @@ public class HTable implements HTableInterface { keysInRange.add(currentKey); regionsInRange.add(regionLocation); currentKey = regionLocation.getRegionInfo().getEndKey(); - } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) - && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 - || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); - return new Pair, List>(keysInRange, - regionsInRange); + } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) && (endKeyIsEndOfTable + || Bytes.compareTo(currentKey, endKey) < 0 + || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); + return new Pair, List>(keysInRange, regionsInRange); } /** * {@inheritDoc} + * * @deprecated Use reversed scan instead. */ - @Override - @Deprecated - public Result getRowOrBefore(final byte[] row, final byte[] family) - throws IOException { - RegionServerCallable callable = new RegionServerCallable(this.connection, - tableName, row) { - @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest( - getLocation().getRegionInfo().getRegionName(), row, family); - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); - } - - /** - * The underlying {@link HTable} must not be closed. - * {@link HTableInterface#getScanner(Scan)} has other usage details. + @Override + @Deprecated + public Result getRowOrBefore(final byte[] row, final byte[] family) throws IOException { + RegionServerCallable callable = + new RegionServerCallable(this.connection, tableName, row) { + @Override + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + ClientProtos.GetRequest request = RequestConverter.buildGetRowOrBeforeRequest( + getLocation().getRegionInfo().getRegionName(), row, family); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (!response.hasResult()) { + return null; + } + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); + } + + /** + * The underlying {@link HTable} must not be closed. {@link Table#getScanner(Scan)} has other + * usage details. */ @Override public ResultScanner getScanner(final Scan scan) throws IOException { @@ -779,45 +803,64 @@ public class HTable implements HTableInterface { if (scan.isReversed()) { if (scan.isSmall()) { - return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ClientSmallReversedScanner(getConfiguration(), + scan, + getName(), + this.connection, + this.rpcCallerFactory, + this.rpcControllerFactory, + pool, + tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { - return new ReversedClientScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ReversedClientScanner(getConfiguration(), + scan, + getName(), + this.connection, + this.rpcCallerFactory, + this.rpcControllerFactory, + pool, + tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } if (scan.isSmall()) { - return new ClientSmallScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ClientSmallScanner(getConfiguration(), + scan, + getName(), + this.connection, + this.rpcCallerFactory, + this.rpcControllerFactory, + pool, + tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { - return new ClientScanner(getConfiguration(), scan, getName(), this.connection, - this.rpcCallerFactory, this.rpcControllerFactory, - pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ClientScanner(getConfiguration(), + scan, + getName(), + this.connection, + this.rpcCallerFactory, + this.rpcControllerFactory, + pool, + tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } } /** - * The underlying {@link HTable} must not be closed. - * {@link HTableInterface#getScanner(byte[])} has other usage details. + * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[])} has other + * usage details. */ @Override - public ResultScanner getScanner(byte [] family) throws IOException { + public ResultScanner getScanner(byte[] family) throws IOException { Scan scan = new Scan(); scan.addFamily(family); return getScanner(scan); } /** - * The underlying {@link HTable} must not be closed. - * {@link HTableInterface#getScanner(byte[], byte[])} has other usage details. + * The underlying {@link HTable} must not be closed. {@link Table#getScanner(byte[], byte[])} has + * other usage details. */ @Override - public ResultScanner getScanner(byte [] family, byte [] qualifier) - throws IOException { + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { Scan scan = new Scan(); scan.addColumn(family, qualifier); return getScanner(scan); @@ -828,39 +871,45 @@ public class HTable implements HTableInterface { */ @Override public Result get(final Get get) throws IOException { - if (get.getConsistency() == null){ + if (get.getConsistency() == null) { get.setConsistency(defaultConsistency); } if (get.getConsistency() == Consistency.STRONG) { // Good old call. - RegionServerCallable callable = new RegionServerCallable(this.connection, - getName(), get.getRow()) { - @Override - public Result call(int callTimeout) throws IOException { - ClientProtos.GetRequest request = - RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), get); - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - ClientProtos.GetResponse response = getStub().get(controller, request); - if (response == null) return null; - return ProtobufUtil.toResult(response.getResult()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; + RegionServerCallable callable = + new RegionServerCallable(this.connection, getName(), get.getRow()) { + @Override + public Result call(int callTimeout) throws IOException { + ClientProtos.GetRequest request = RequestConverter.buildGetRequest( + getLocation().getRegionInfo().getRegionName(), get); + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + ClientProtos.GetResponse response = getStub().get(controller, request); + if (response == null) { + return null; + } + return ProtobufUtil.toResult(response.getResult()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } // Call that takes into account the replica - RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( - rpcControllerFactory, tableName, this.connection, get, pool, - tableConfiguration.getRetriesNumber(), - operationTimeout, - tableConfiguration.getPrimaryCallTimeoutMicroSecond()); + RpcRetryingCallerWithReadReplicas callable = + new RpcRetryingCallerWithReadReplicas(rpcControllerFactory, + tableName, + this.connection, + get, + pool, + tableConfiguration.getRetriesNumber(), + operationTimeout, + tableConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(); } @@ -871,14 +920,14 @@ public class HTable implements HTableInterface { @Override public Result[] get(List gets) throws IOException { if (gets.size() == 1) { - return new Result[]{get(gets.get(0))}; + return new Result[] {get(gets.get(0))}; } try { - Object [] r1 = batch((List)gets); + Object[] r1 = batch((List) gets); // translate. - Result [] results = new Result[r1.length]; - int i=0; + Result[] results = new Result[r1.length]; + int i = 0; for (Object o : r1) { // batch ensures if there is a failure we get an exception instead results[i++] = (Result) o; @@ -886,7 +935,7 @@ public class HTable implements HTableInterface { return results; } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); + throw (InterruptedIOException) new InterruptedIOException().initCause(e); } } @@ -905,13 +954,14 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} - * @deprecated If any exception is thrown by one of the actions, there is no way to - * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead. + * + * @deprecated If any exception is thrown by one of the actions, there is no way to retrieve the + * partially executed results. Use {@link #batch(List, Object[])} instead. */ @Deprecated @Override - public Object[] batch(final List actions) - throws InterruptedException, IOException { + public Object[] batch(final List actions) throws InterruptedException, + IOException { Object[] results = new Object[actions.size()]; batch(actions, results); return results; @@ -921,24 +971,22 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void batchCallback( - final List actions, final Object[] results, final Batch.Callback callback) - throws IOException, InterruptedException { + public void batchCallback(final List actions, final Object[] results, + final Batch.Callback callback) throws IOException, InterruptedException { connection.processBatchCallback(actions, tableName, pool, results, callback); } /** * {@inheritDoc} - * @deprecated If any exception is thrown by one of the actions, there is no way to - * retrieve the partially executed results. Use - * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} - * instead. + * + * @deprecated If any exception is thrown by one of the actions, there is no way to retrieve the + * partially executed results. Use {@link #batchCallback(List, Object[], + * org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)} instead. */ @Deprecated @Override - public Object[] batchCallback( - final List actions, final Batch.Callback callback) throws IOException, - InterruptedException { + public Object[] batchCallback(final List actions, + final Batch.Callback callback) throws IOException, InterruptedException { Object[] results = new Object[actions.size()]; batchCallback(actions, results, callback); return results; @@ -948,45 +996,44 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void delete(final Delete delete) - throws IOException { - RegionServerCallable callable = new RegionServerCallable(connection, - tableName, delete.getRow()) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + public void delete(final Delete delete) throws IOException { + RegionServerCallable callable = + new RegionServerCallable(connection, tableName, delete.getRow()) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public void delete(final List deletes) - throws IOException { + public void delete(final List deletes) throws IOException { Object[] results = new Object[deletes.size()]; try { batch(deletes, results); } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); + throw (InterruptedIOException) new InterruptedIOException().initCause(e); } finally { // mutate list so that it is empty for complete success, or contains only failed records // results are returned in the same order as the requests in list - // walk the list backwards, so we can remove from list without impacting the indexes of earlier members - for (int i = results.length - 1; i>=0; i--) { + // walk the list backwards, so we can remove from list without impacting the indexes of + // earlier members + for (int i = results.length - 1; i >= 0; i--) { // if result is not null, it succeeded if (results[i] instanceof Result) { deletes.remove(i); @@ -999,9 +1046,9 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void put(final Put put) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doPut(put); + public void put(final Put put) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + this.mutator.put(put); if (autoFlush) { flushCommits(); } @@ -1011,77 +1058,11 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void put(final List puts) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Put put : puts) { - doPut(put); - } + public void put(final List puts) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + this.mutator.put(puts); if (autoFlush) { - flushCommits(); - } - } - - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(put); - backgroundFlushCommits(true); - } - - validatePut(put); - - currentWriteBufferSize += put.heapSize(); - writeAsyncBuffer.add(put); - - while (currentWriteBufferSize > writeBufferSize) { - backgroundFlushCommits(false); - } - } - - - /** - * Send the operations in the buffer to the servers. Does not wait for the server's answer. - * If the is an error (max retried reach from a previous flush or bad operation), it tries to - * send all operations in the buffer and sends an exception. - * @param synchronous - if true, sends all the writes and wait for all of them to finish before - * returning. - */ - private void backgroundFlushCommits(boolean synchronous) throws - InterruptedIOException, RetriesExhaustedWithDetailsException { - - try { - if (!synchronous) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - if (ap.hasError()) { - LOG.debug(tableName + ": One or more of the operations have failed -" + - " waiting for all operation in progress to finish (successfully or not)"); - } - } - if (synchronous || ap.hasError()) { - while (!writeAsyncBuffer.isEmpty()) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); - if (error != null) { - throw error; - } - } - } finally { - currentWriteBufferSize = 0; - for (Row mut : writeAsyncBuffer) { - if (mut instanceof Mutation) { - currentWriteBufferSize += ((Mutation) mut).heapSize(); - } - } + mutator.flushCommits(); } } @@ -1092,25 +1073,25 @@ public class HTable implements HTableInterface { public void mutateRow(final RowMutations rm) throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), rm.getRow()) { - @Override - public Void call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( - getLocation().getRegionInfo().getRegionName(), rm); - regionMutationBuilder.setAtomic(true); - MultiRequest request = - MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - getStub().multi(controller, request); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - return null; - } - }; - rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + @Override + public Void call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( + getLocation().getRegionInfo().getRegionName(), rm); + regionMutationBuilder.setAtomic(true); + MultiRequest request = + MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); + getStub().multi(controller, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + return null; + } + }; + rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** @@ -1119,31 +1100,32 @@ public class HTable implements HTableInterface { @Override public Result append(final Append append) throws IOException { if (append.numFamilies() == 0) { - throw new IOException( - "Invalid arguments to append, no columns specified"); + throw new IOException("Invalid arguments to append, no columns specified"); } NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = - new RegionServerCallable(this.connection, getName(), append.getRow()) { - @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - if (!response.hasResult()) return null; - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + new RegionServerCallable(this.connection, getName(), append.getRow()) { + @Override + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), append, nonceGroup, nonce); + MutateResponse response = getStub().mutate(controller, request); + if (!response.hasResult()) { + return null; + } + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** @@ -1152,38 +1134,36 @@ public class HTable implements HTableInterface { @Override public Result increment(final Increment increment) throws IOException { if (!increment.hasFamilies()) { - throw new IOException( - "Invalid arguments to increment, no columns specified"); + throw new IOException("Invalid arguments to increment, no columns specified"); } NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); - RegionServerCallable callable = new RegionServerCallable(this.connection, - getName(), increment.getRow()) { - @Override - public Result call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + RegionServerCallable callable = + new RegionServerCallable(this.connection, getName(), increment.getRow()) { + @Override + public Result call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildMutateRequest( + getLocation().getRegionInfo().getRegionName(), increment, nonceGroup, nonce); + MutateResponse response = getStub().mutate(controller, request); + return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public long incrementColumnValue(final byte [] row, final byte [] family, - final byte [] qualifier, final long amount) - throws IOException { + public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, + final long amount) throws IOException { return incrementColumnValue(row, family, qualifier, amount, Durability.SYNC_WAL); } @@ -1192,20 +1172,18 @@ public class HTable implements HTableInterface { */ @Deprecated @Override - public long incrementColumnValue(final byte [] row, final byte [] family, - final byte [] qualifier, final long amount, final boolean writeToWAL) - throws IOException { + public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, + final long amount, final boolean writeToWAL) throws IOException { return incrementColumnValue(row, family, qualifier, amount, - writeToWAL? Durability.SKIP_WAL: Durability.USE_DEFAULT); + writeToWAL ? Durability.SKIP_WAL : Durability.USE_DEFAULT); } /** * {@inheritDoc} */ @Override - public long incrementColumnValue(final byte [] row, final byte [] family, - final byte [] qualifier, final long amount, final Durability durability) - throws IOException { + public long incrementColumnValue(final byte[] row, final byte[] family, final byte[] qualifier, + final long amount, final Durability durability) throws IOException { NullPointerException npe = null; if (row == null) { npe = new NullPointerException("row is null"); @@ -1215,160 +1193,187 @@ public class HTable implements HTableInterface { npe = new NullPointerException("qualifier is null"); } if (npe != null) { - throw new IOException( - "Invalid arguments to incrementColumnValue", npe); + throw new IOException("Invalid arguments to incrementColumnValue", npe); } NonceGenerator ng = this.connection.getNonceGenerator(); final long nonceGroup = ng.getNonceGroup(), nonce = ng.newNonce(); RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { - @Override - public Long call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildIncrementRequest( - getLocation().getRegionInfo().getRegionName(), row, family, - qualifier, amount, durability, nonceGroup, nonce); - MutateResponse response = getStub().mutate(controller, request); - Result result = - ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + new RegionServerCallable(connection, getName(), row) { + @Override + public Long call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = RequestConverter.buildIncrementRequest(getLocation() + .getRegionInfo().getRegionName(), + row, + family, + qualifier, + amount, + durability, + nonceGroup, + nonce); + MutateResponse response = getStub().mutate(controller, request); + Result result = ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); + return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier))); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndPut(final byte [] row, - final byte [] family, final byte [] qualifier, final byte [] value, - final Put put) - throws IOException { + public boolean checkAndPut(final byte[] row, final byte[] family, final byte[] qualifier, + final byte[] value, final Put put) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + new RegionServerCallable(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = + RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), + row, + family, + qualifier, + new BinaryComparator(value), + CompareType.EQUAL, + put); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndPut(final byte [] row, final byte [] family, - final byte [] qualifier, final CompareOp compareOp, final byte [] value, - final Put put) - throws IOException { + public boolean checkAndPut(final byte[] row, + final byte[] family, + final byte[] qualifier, + final CompareOp compareOp, + final byte[] value, + final Put put) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + new RegionServerCallable(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = new PayloadCarryingRpcController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = + RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), + row, + family, + qualifier, + new BinaryComparator(value), + compareType, + put); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndDelete(final byte [] row, - final byte [] family, final byte [] qualifier, final byte [] value, - final Delete delete) - throws IOException { + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final byte[] value, final Delete delete) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), CompareType.EQUAL, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + new RegionServerCallable(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + MutateRequest request = + RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), + row, + family, + qualifier, + new BinaryComparator(value), + CompareType.EQUAL, + delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndDelete(final byte [] row, final byte [] family, - final byte [] qualifier, final CompareOp compareOp, final byte [] value, - final Delete delete) - throws IOException { + public boolean checkAndDelete(final byte[] row, + final byte[] family, + final byte[] qualifier, + final CompareOp compareOp, + final byte[] value, + final Delete delete) throws IOException { RegionServerCallable callable = - new RegionServerCallable(connection, getName(), row) { - @Override - public Boolean call(int callTimeout) throws IOException { - PayloadCarryingRpcController controller = rpcControllerFactory.newController(); - controller.setPriority(tableName); - controller.setCallTimeout(callTimeout); - try { - CompareType compareType = CompareType.valueOf(compareOp.name()); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = getStub().mutate(controller, request); - return Boolean.valueOf(response.getProcessed()); - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + new RegionServerCallable(connection, getName(), row) { + @Override + public Boolean call(int callTimeout) throws IOException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + controller.setCallTimeout(callTimeout); + try { + CompareType compareType = CompareType.valueOf(compareOp.name()); + MutateRequest request = + RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), + row, + family, + qualifier, + new BinaryComparator(value), + compareType, + delete); + MutateResponse response = getStub().mutate(controller, request); + return Boolean.valueOf(response.getProcessed()); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } } - } - }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + }; + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** * {@inheritDoc} */ @Override - public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOp compareOp, final byte [] value, final RowMutations rm) - throws IOException { + public boolean checkAndMutate(final byte[] row, + final byte[] family, + final byte[] qualifier, + final CompareOp compareOp, + final byte[] value, + final RowMutations rm) throws IOException { RegionServerCallable callable = new RegionServerCallable(connection, getName(), row) { @Override @@ -1378,9 +1383,14 @@ public class HTable implements HTableInterface { controller.setCallTimeout(callTimeout); try { CompareType compareType = CompareType.valueOf(compareOp.name()); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); + MultiRequest request = + RequestConverter.buildMutateRequest(getLocation().getRegionInfo().getRegionName(), + row, + family, + qualifier, + new BinaryComparator(value), + compareType, + rm); ClientProtos.MultiResponse response = getStub().multi(controller, request); return Boolean.valueOf(response.getProcessed()); } catch (ServiceException se) { @@ -1388,7 +1398,7 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory. newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.newCaller().callWithRetries(callable, this.operationTimeout); } /** @@ -1407,10 +1417,14 @@ public class HTable implements HTableInterface { */ @Override public boolean[] existsAll(final List gets) throws IOException { - if (gets.isEmpty()) return new boolean[]{}; - if (gets.size() == 1) return new boolean[]{exists(gets.get(0))}; + if (gets.isEmpty()) { + return new boolean[] {}; + } + if (gets.size() == 1) { + return new boolean[] {exists(gets.get(0))}; + } - for (Get g: gets){ + for (Get g : gets) { g.setCheckExistenceOnly(true); } @@ -1418,7 +1432,7 @@ public class HTable implements HTableInterface { try { r1 = batch(gets); } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); + throw (InterruptedIOException) new InterruptedIOException().initCause(e); } // translate. @@ -1426,7 +1440,7 @@ public class HTable implements HTableInterface { int i = 0; for (Object o : r1) { // batch ensures if there is a failure we get an exception instead - results[i++] = ((Result)o).getExists(); + results[i++] = ((Result) o).getExists(); } return results; @@ -1451,35 +1465,31 @@ public class HTable implements HTableInterface { */ @Override public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); + this.mutator.flushCommits(); } /** - * Process a mixed batch of Get, Put and Delete actions. All actions for a - * RegionServer are forwarded in one RPC call. Queries are executed in parallel. + * Process a mixed batch of Get, Put and Delete actions. All actions for a RegionServer are + * forwarded in one RPC call. Queries are executed in parallel. * * @param list The collection of actions. - * @param results An empty array, same size as list. If an exception is thrown, - * you can test here for partial results, and to determine which actions - * processed successfully. - * @throws IOException if there are problems talking to META. Per-item - * exceptions are stored in the results array. - */ - public void processBatchCallback( - final List list, final Object[] results, final Batch.Callback callback) - throws IOException, InterruptedException { + * @param results An empty array, same size as list. If an exception is thrown, you can test here + * for partial results, and to determine which actions processed successfully. + * @throws IOException if there are problems talking to META. Per-item exceptions are stored in + * the results array. + */ + public void processBatchCallback(final List list, final Object[] results, + final Batch.Callback callback) throws IOException, InterruptedException { this.batchCallback(list, results, callback); } /** - * Parameterized batch processing, allowing varying return types for different - * {@link Row} implementations. + * Parameterized batch processing, allowing varying return types for different {@link Row} + * implementations. */ public void processBatch(final List list, final Object[] results) - throws IOException, InterruptedException { + throws IOException, InterruptedException { this.batch(list, results); } @@ -1511,11 +1521,6 @@ public class HTable implements HTableInterface { } // validate for well-formedness - public void validatePut(final Put put) throws IllegalArgumentException { - validatePut(put, tableConfiguration.getMaxKeyValueSize()); - } - - // validate for well-formedness public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException { if (put.isEmpty()) { throw new IllegalArgumentException("No columns to insert"); @@ -1567,33 +1572,32 @@ public class HTable implements HTableInterface { /** * Returns the maximum size in bytes of the write buffer for this HTable. *

- * The default value comes from the configuration parameter - * {@code hbase.client.write.buffer}. + * The default value comes from the configuration parameter {@code hbase.client.write.buffer}. + * * @return The size of the write buffer in bytes. */ @Override public long getWriteBufferSize() { - return writeBufferSize; + return this.mutator.getWriteBufferSize(); } /** * Sets the size of the buffer in bytes. *

- * If the new size is less than the current amount of data in the - * write buffer, the buffer gets flushed. + * If the new size is less than the current amount of data in the write buffer, the buffer gets + * flushed. + * * @param writeBufferSize The new write buffer size, in bytes. * @throws IOException if a remote or network exception occurs. */ @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { - this.writeBufferSize = writeBufferSize; - if(currentWriteBufferSize > writeBufferSize) { - flushCommits(); - } + this.mutator.setWriteBufferSize(writeBufferSize); } /** * The pool is used for mutli requests for this HTable + * * @return the pool used for mutli */ ExecutorService getPool() { @@ -1601,66 +1605,58 @@ public class HTable implements HTableInterface { } /** - * Enable or disable region cache prefetch for the table. It will be - * applied for the given table's all HTable instances who share the same - * connection. By default, the cache prefetch is enabled. + * Enable or disable region cache prefetch for the table. It will be applied for the given table's + * all HTable instances who share the same connection. By default, the cache prefetch is enabled. + * * @param tableName name of table to configure. - * @param enable Set to true to enable region cache prefetch. Or set to - * false to disable it. + * @param enable Set to true to enable region cache prefetch. Or set to false to disable it. * @throws IOException * @deprecated does nothing since 0.99 */ @Deprecated - public static void setRegionCachePrefetch(final byte[] tableName, - final boolean enable) throws IOException { - } + public static void setRegionCachePrefetch(final byte[] tableName, final boolean enable) + throws IOException {} /** * @deprecated does nothing since 0.99 */ @Deprecated - public static void setRegionCachePrefetch( - final TableName tableName, - final boolean enable) throws IOException { - } + public static void setRegionCachePrefetch(final TableName tableName, final boolean enable) + throws IOException {} /** - * Enable or disable region cache prefetch for the table. It will be - * applied for the given table's all HTable instances who share the same - * connection. By default, the cache prefetch is enabled. + * Enable or disable region cache prefetch for the table. It will be applied for the given table's + * all HTable instances who share the same connection. By default, the cache prefetch is enabled. + * * @param conf The Configuration object to use. * @param tableName name of table to configure. - * @param enable Set to true to enable region cache prefetch. Or set to - * false to disable it. + * @param enable Set to true to enable region cache prefetch. Or set to false to disable it. * @throws IOException * @deprecated does nothing since 0.99 */ @Deprecated - public static void setRegionCachePrefetch(final Configuration conf, - final byte[] tableName, final boolean enable) throws IOException { - } + public static void setRegionCachePrefetch(final Configuration conf, final byte[] tableName, + final boolean enable) throws IOException {} /** * @deprecated does nothing since 0.99 */ @Deprecated - public static void setRegionCachePrefetch(final Configuration conf, - final TableName tableName, - final boolean enable) throws IOException { - } + public static void setRegionCachePrefetch(final Configuration conf, final TableName tableName, + final boolean enable) throws IOException {} /** * Check whether region cache prefetch is enabled or not for the table. + * * @param conf The Configuration object to use. * @param tableName name of table to check - * @return true if table's region cache prefecth is enabled. Otherwise - * it is disabled. + * @return true if table's region cache prefecth is enabled. Otherwise it is disabled. * @throws IOException * @deprecated always return false since 0.99 */ @Deprecated - public static boolean getRegionCachePrefetch(final Configuration conf, - final byte[] tableName) throws IOException { + public static boolean getRegionCachePrefetch(final Configuration conf, final byte[] tableName) + throws IOException { return false; } @@ -1668,16 +1664,16 @@ public class HTable implements HTableInterface { * @deprecated always return false since 0.99 */ @Deprecated - public static boolean getRegionCachePrefetch(final Configuration conf, - final TableName tableName) throws IOException { + public static boolean getRegionCachePrefetch(final Configuration conf, final TableName tableName) + throws IOException { return false; } /** * Check whether region cache prefetch is enabled or not for the table. + * * @param tableName name of table to check - * @return true if table's region cache prefecth is enabled. Otherwise - * it is disabled. + * @return true if table's region cache prefecth is enabled. Otherwise it is disabled. * @throws IOException * @deprecated always return false since 0.99 */ @@ -1690,14 +1686,13 @@ public class HTable implements HTableInterface { * @deprecated always return false since 0.99 */ @Deprecated - public static boolean getRegionCachePrefetch( - final TableName tableName) throws IOException { + public static boolean getRegionCachePrefetch(final TableName tableName) throws IOException { return false; } /** - * Explicitly clears the region cache to fetch the latest value from META. - * This is a power user function: avoid unless you know the ramifications. + * Explicitly clears the region cache to fetch the latest value from META. This is a power user + * function: avoid unless you know the ramifications. */ public void clearRegionCache() { this.connection.clearRegionCache(); @@ -1715,11 +1710,11 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public Map coprocessorService(final Class service, - byte[] startKey, byte[] endKey, final Batch.Call callable) - throws ServiceException, Throwable { - final Map results = Collections.synchronizedMap( - new TreeMap(Bytes.BYTES_COMPARATOR)); + public Map coprocessorService(final Class service, + byte[] startKey, byte[] endKey, final Batch.Call callable) throws ServiceException, + Throwable { + final Map results = + Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); coprocessorService(service, startKey, endKey, callable, new Batch.Callback() { @Override public void update(byte[] region, byte[] row, R value) { @@ -1735,34 +1730,32 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void coprocessorService(final Class service, - byte[] startKey, byte[] endKey, final Batch.Call callable, - final Batch.Callback callback) throws ServiceException, Throwable { + public void coprocessorService(final Class service, byte[] startKey, + byte[] endKey, final Batch.Call callable, final Batch.Callback callback) + throws ServiceException, Throwable { // get regions covered by the row range List keys = getStartKeysInRange(startKey, endKey); - Map> futures = - new TreeMap>(Bytes.BYTES_COMPARATOR); + Map> futures = new TreeMap>(Bytes.BYTES_COMPARATOR); for (final byte[] r : keys) { final RegionCoprocessorRpcChannel channel = new RegionCoprocessorRpcChannel(connection, tableName, r); - Future future = pool.submit( - new Callable() { - @Override - public R call() throws Exception { - T instance = ProtobufUtil.newServiceStub(service, channel); - R result = callable.call(instance); - byte[] region = channel.getLastRegion(); - if (callback != null) { - callback.update(region, r, result); - } - return result; - } - }); + Future future = pool.submit(new Callable() { + @Override + public R call() throws Exception { + T instance = ProtobufUtil.newServiceStub(service, channel); + R result = callable.call(instance); + byte[] region = channel.getLastRegion(); + if (callback != null) { + callback.update(region, r, result); + } + return result; + } + }); futures.put(r, future); } - for (Map.Entry> e : futures.entrySet()) { + for (Map.Entry> e : futures.entrySet()) { try { e.getValue().get(); } catch (ExecutionException ee) { @@ -1770,15 +1763,13 @@ public class HTable implements HTableInterface { + Bytes.toStringBinary(e.getKey()), ee); throw ee.getCause(); } catch (InterruptedException ie) { - throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName() - + " for row " + Bytes.toStringBinary(e.getKey())) - .initCause(ie); + throw new InterruptedIOException("Interrupted calling coprocessor service " + + service.getName() + " for row " + Bytes.toStringBinary(e.getKey())).initCause(ie); } } } - private List getStartKeysInRange(byte[] start, byte[] end) - throws IOException { + private List getStartKeysInRange(byte[] start, byte[] end) throws IOException { if (start == null) { start = HConstants.EMPTY_START_ROW; } @@ -1806,11 +1797,15 @@ public class HTable implements HTableInterface { */ @Override public Map batchCoprocessorService( - Descriptors.MethodDescriptor methodDescriptor, Message request, - byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { - final Map results = Collections.synchronizedMap(new TreeMap( - Bytes.BYTES_COMPARATOR)); - batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype, + Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, + byte[] endKey, R responsePrototype) throws ServiceException, Throwable { + final Map results = + Collections.synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); + batchCoprocessorService(methodDescriptor, + request, + startKey, + endKey, + responsePrototype, new Callback() { @Override @@ -1827,10 +1822,13 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void batchCoprocessorService( - final Descriptors.MethodDescriptor methodDescriptor, final Message request, - byte[] startKey, byte[] endKey, final R responsePrototype, final Callback callback) - throws ServiceException, Throwable { + public < + R extends Message> void batchCoprocessorService(final Descriptors.MethodDescriptor methodDescriptor, + final Message request, + byte[] startKey, + byte[] endKey, + final R responsePrototype, + final Callback callback) throws ServiceException, Throwable { // get regions covered by the row range Pair, List> keysAndRegions = @@ -1840,8 +1838,8 @@ public class HTable implements HTableInterface { // check if we have any calls to make if (keys.isEmpty()) { - LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + - ", end=" + Bytes.toStringBinary(endKey)); + LOG.info("No regions were selected by key range start=" + Bytes.toStringBinary(startKey) + + ", end=" + Bytes.toStringBinary(endKey)); return; } @@ -1864,26 +1862,26 @@ public class HTable implements HTableInterface { final List callbackErrorServers = new ArrayList(); Object[] results = new Object[execs.size()]; - AsyncProcess asyncProcess = - new AsyncProcess(connection, configuration, pool, - RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + AsyncProcess asyncProcess = new AsyncProcess(connection, + configuration, + pool, + RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), + true, + RpcControllerFactory.instantiate(configuration)); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback() { @Override public void update(byte[] region, byte[] row, - ClientProtos.CoprocessorServiceResult serviceResult) { + ClientProtos.CoprocessorServiceResult serviceResult) { if (LOG.isTraceEnabled()) { - LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + - ": region=" + Bytes.toStringBinary(region) + - ", row=" + Bytes.toStringBinary(row) + - ", value=" + serviceResult.getValue().getValue()); + LOG.trace("Received result for endpoint " + methodDescriptor.getFullName() + + ": region=" + Bytes.toStringBinary(region) + ", row=" + + Bytes.toStringBinary(row) + ", value=" + serviceResult.getValue().getValue()); } try { - callback.update(region, row, - (R) responsePrototype.newBuilderForType().mergeFrom( - serviceResult.getValue().getValue()).build()); + callback.update(region, row, (R) responsePrototype.newBuilderForType() + .mergeFrom(serviceResult.getValue().getValue()).build()); } catch (InvalidProtocolBufferException e) { LOG.error("Unexpected response type from endpoint " + methodDescriptor.getFullName(), e); diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java new file mode 100644 index 0000000..1c51887 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.client.HBulkMutator; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * An implementation of {@link Lock} that doesn't actually lock anything. {@link HBulkMutator} uses + * a Lock, but there are cases + * + */ +public class DoNothingLock implements Lock { + + @Override + public void lock() { + } + + @Override + public void lockInterruptibly() throws InterruptedException { + } + + @Override + public boolean tryLock() { + return false; + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public void unlock() { + } + + @Override + public Condition newCondition() { + return null; + } +} diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 88a95fb..1fffd86 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -155,8 +155,8 @@ public class TestAsyncProcess { new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); } - public MyAsyncProcess( - ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) { + public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, + @SuppressWarnings("unused") boolean dummy) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) { @Override @@ -649,7 +649,7 @@ public class TestAsyncProcess { @Test public void testHTablePutSuccess() throws Exception { - HTable ht = Mockito.mock(HTable.class); + HBulkMutator ht = Mockito.mock(HBulkMutator.class); ht.ap = new MyAsyncProcess(createHConnection(), conf, true); Put put = createPut(1, true); @@ -662,7 +662,7 @@ public class TestAsyncProcess { private void doHTableFailedPut(boolean bufferOn) throws Exception { HTable ht = new HTable(); MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; + ht.mutator.ap = ap; ht.setAutoFlushTo(true); if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); @@ -672,7 +672,7 @@ public class TestAsyncProcess { Put put = createPut(1, false); - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); try { ht.put(put); if (bufferOn) { @@ -681,7 +681,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -710,7 +710,7 @@ public class TestAsyncProcess { public void testHTableFailedPutAndNewPut() throws Exception { HTable ht = new HTable(); MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; + ht.mutator.ap = ap; ht.setAutoFlushTo(false); ht.setWriteBufferSize(0); @@ -725,13 +725,15 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, ht.writeAsyncBuffer.size()); + Assert.assertEquals(0, ht.mutator.getWriteBuffer().size()); try { ht.put(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size()); + Assert.assertEquals("the put should not been inserted.", 0, ht.mutator.getWriteBuffer().size()); + + ht.close(); } @@ -801,11 +803,11 @@ public class TestAsyncProcess { ht.connection = new MyConnectionImpl(configuration); MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true); - ht.ap = ap; + ht.mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); - Assert.assertTrue(ht.ap.serverTrackerTimeout > 200); - ht.ap.serverTrackerTimeout = 1; + Assert.assertNotNull(ht.mutator.ap.createServerErrorTracker()); + Assert.assertTrue(ht.mutator.ap.serverTrackerTimeout > 200); + ht.mutator.ap.serverTrackerTimeout = 1; Put p = createPut(1, false); ht.setAutoFlushTo(false); @@ -818,6 +820,7 @@ public class TestAsyncProcess { } // Checking that the ErrorsServers came into play and didn't make us stop immediately Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); + ht.close(); } @Test @@ -825,9 +828,9 @@ public class TestAsyncProcess { HTable ht = new HTable(); ht.connection = new MyConnectionImpl(conf); AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf); - ht.ap = ap; + ht.mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); + Assert.assertNotNull(ht.mutator.ap.createServerErrorTracker()); Put p = createPut(1, true); ht.setAutoFlushTo(false); @@ -840,6 +843,7 @@ public class TestAsyncProcess { } // Checking that the ErrorsServers came into play and didn't make us stop immediately Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); + ht.close(); } /** @@ -867,7 +871,7 @@ public class TestAsyncProcess { MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); ht.multiAp = ap; - ht.batch(gets); + ht.batch(gets, new Object[gets.size()]); Assert.assertEquals(ap.nbActions.get(), NB_REGS); Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get());