From df211e2c0403687866441ffb37f8e57ac63a95d4 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Wed, 21 Jan 2015 16:37:27 -0800 Subject: [PATCH] HBASE-12728 buffered writes substantially less useful after removal of HTablePool (Solomon Duskis and Nick Dimiduk) In our pre-1.0 API, HTable is considered a light-weight object that consumed by a single thread at a time. The HTablePool class provided a means of sharing multiple HTable instances across a number of threads. As an optimization, HTable managed a "write buffer", accumulating edits and sending a "batch" all at once. By default the batch was sent as the last step in invocations of put(Put) and put(List). The user could disable the automatic flushing of the write buffer, retaining edits locally and only sending the whole "batch" once the write buffer has filled or when the flushCommits() method in invoked explicitly. Explicit or implicit batch writing was controlled by the setAutoFlushTo(boolean) method. A value of true (the default) had the write buffer flushed at the completion of a call to put(Put) or put(List). A value of false allowed for explicit buffer management. HTable also exposed the buffer to consumers via getWriteBuffer(). The combination of HTable with setAutoFlushTo(false) and the HTablePool provided a convenient mechanism by which multiple "Put-producing" threads could share a common write buffer. Both HTablePool and HTable are deprecated, and they are officially replaced in The new 1.0 API by Table and BufferedMutator. Table, which replaces HTable, no longer exposes explicit write-buffer management. Instead, explicit buffer management is exposed via BufferedMutator. BufferedMutator is made safe for concurrent use. Where code would previously retrieve and return HTables from an HTablePool, now that code creates and shares a single BufferedMutator instance across all threads. Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java --- .../hadoop/hbase/client/BufferedMutator.java | 129 +++++++++++ .../hbase/client/BufferedMutatorBuilder.java | 154 +++++++++++++ .../hadoop/hbase/client/BufferedMutatorImpl.java | 254 +++++++++++++++++++++ .../org/apache/hadoop/hbase/client/Connection.java | 31 +++ .../hadoop/hbase/client/ConnectionAdapter.java | 11 + .../hadoop/hbase/client/ConnectionManager.java | 24 +- .../org/apache/hadoop/hbase/client/HTable.java | 137 ++++------- .../hadoop/hbase/client/HTableInterface.java | 64 +++++- .../java/org/apache/hadoop/hbase/client/Table.java | 32 +-- .../hadoop/hbase/client/TableConfiguration.java | 20 +- .../hadoop/hbase/client/TestAsyncProcess.java | 96 ++++---- .../hadoop/hbase/client/TestClientNoCluster.java | 49 ++-- .../client/example/BufferedMutatorExample.java | 119 ++++++++++ .../hbase/test/IntegrationTestBigLinkedList.java | 25 +- ...IntegrationTestBigLinkedListWithVisibility.java | 9 +- .../hbase/test/IntegrationTestLoadAndVerify.java | 44 ++-- ...grationTestWithCellVisibilityLoadAndVerify.java | 4 +- .../trace/IntegrationTestSendTraceRequests.java | 8 +- .../org/apache/hadoop/hbase/rest/RowResource.java | 3 - .../hadoop/hbase/rest/PerformanceEvaluation.java | 103 +++++---- .../hadoop/hbase/rest/client/TestRemoteTable.java | 15 +- .../apache/hadoop/hbase/client/HTableWrapper.java | 8 +- .../hadoop/hbase/mapred/TableOutputFormat.java | 19 +- .../hbase/mapreduce/MultiTableOutputFormat.java | 35 +-- .../hadoop/hbase/mapreduce/TableOutputFormat.java | 28 ++- .../apache/hadoop/hbase/PerformanceEvaluation.java | 82 +++++-- .../hbase/client/TestCloneSnapshotFromClient.java | 37 ++- .../hadoop/hbase/client/TestFromClientSide.java | 32 +-- .../hadoop/hbase/client/TestMultiParallel.java | 32 +-- .../client/TestRestoreSnapshotFromClient.java | 16 +- .../hbase/client/TestRpcControllerFactory.java | 10 +- .../hbase/coprocessor/TestHTableWrapper.java | 6 +- .../hbase/master/TestDistributedLogSplitting.java | 10 +- .../org/apache/hadoop/hbase/master/TestMaster.java | 10 +- .../regionserver/TestEndToEndSplitTransaction.java | 6 +- .../hbase/regionserver/TestFSErrorsExposed.java | 32 ++- .../hbase/regionserver/TestRegionFavoredNodes.java | 1 + .../regionserver/TestRegionServerMetrics.java | 91 ++++---- .../regionserver/TestScannerWithBulkload.java | 8 +- .../hbase/regionserver/wal/TestLogRolling.java | 6 +- .../TestReplicationChangingPeerRegionservers.java | 7 +- .../replication/TestReplicationSmallTests.java | 3 +- .../hbase/replication/TestReplicationWithTags.java | 5 +- .../TestVisibilityLabelsReplication.java | 29 +-- .../hbase/snapshot/SnapshotTestingUtils.java | 29 ++- .../snapshot/TestFlushSnapshotFromClient.java | 16 +- .../TestRestoreFlushSnapshotFromClient.java | 8 +- 47 files changed, 1322 insertions(+), 575 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorBuilder.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java new file mode 100644 index 0000000..606e765 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -0,0 +1,129 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + *

Used to communicate with a single HBase table similar to {@link Table} but meant for + * batched, asynchronous puts. Obtain an instance from a {@link Connection} and call + * {@link #close()} afterwards. Customizations can be applied to the {@code BufferedMutator} via + * the {@link BufferedMutatorBuilder}. + *

+ * + *

Exception handling with asynchronously via the {@link BufferedMutator.ExceptionListener}. + * The default implementation is to throw the exception upon receipt. This behavior can be + * overridden with a custom implementation, provided as a parameter with + * {@link BufferedMutatorBuilder#listener(BufferedMutator.ExceptionListener)}.

+ * + *

Map/Reduce jobs are good use cases for using {@code BufferedMutator}. Map/reduce jobs + * benefit from batching, but have no natural flush point. {@code BufferedMutator} receives the + * puts from the M/R job and will batch puts based on some heuristic, such as the accumulated size + * of the puts, and submit batches of puts asynchronously so that the M/R logic can continue + * without interruption. + *

+ * + *

{@code BufferedMutator} can also be used on more exotic circumstances. Map/Reduce batch jobs + * will have a single {@code BufferedMutator} per thread. A single {@code BufferedMutator} can + * also be effectively used in high volume online systems to batch puts, with the caveat that + * extreme circumstances, such as JVM or machine failure, may cause some data loss.

+ * + *

NOTE: This class replaces the functionality that used to be available via + * {@link HTableInterface#setAutoFlush(boolean)} set to {@code false}. + *

+ * + *

See also the {@code BufferedMutatorExample} in the hbase-examples module.

+ * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface BufferedMutator extends Closeable { + /** + * Gets the fully qualified table name instance of the table that this BufferedMutator writes to. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will + * affect this instance. + */ + Configuration getConfiguration(); + + /** + * Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the + * wire as part of a batch. Currently only supports {@link Put} and {@link Delete} mutations. + * + * @param mutation The data to send. + * @throws IOException if a remote or network exception occurs. + */ + void mutate(Mutation mutation) throws IOException; + + /** + * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the + * wire as part of a batch. There is no guarantee of sending entire content of {@code mutations} + * in a single batch; it will be broken up according to the write buffer capacity. + * + * @param mutations The data to send. + * @throws IOException if a remote or network exception occurs. + */ + void mutate(List mutations) throws IOException; + + /** + * Performs a {@link #flush()} and releases any resources held. + * + * @throws IOException if a remote or network exception occurs. + */ + @Override + void close() throws IOException; + + /** + * Executes all the buffered, asynchronous {@link Mutation} operations and waits until they + * are done. + * + * @throws IOException if a remote or network exception occurs. + */ + void flush() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + long getWriteBufferSize(); + + /** + * Listens for asynchronous exceptions on a {@link BufferedMutator}. + */ + interface ExceptionListener { + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator mutator) throws RetriesExhaustedWithDetailsException; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorBuilder.java new file mode 100644 index 0000000..d310ce5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorBuilder.java @@ -0,0 +1,154 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.TableConfiguration.MAX_KEYVALUE_SIZE_KEY; +import static org.apache.hadoop.hbase.client.TableConfiguration.MAX_KEYVALUE_SIZE_DEFAULT; +import static org.apache.hadoop.hbase.client.TableConfiguration.WRITE_BUFFER_SIZE_KEY; +import static org.apache.hadoop.hbase.client.TableConfiguration.WRITE_BUFFER_SIZE_DEFAULT; + +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * Parameters for instantiating a {@link BufferedMutator}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class BufferedMutatorBuilder { + + private static final int UNSET = -1; + + private final TableName tableName; + private long writeBufferSize = UNSET; + private int maxKeyValueSize = UNSET; + private ExecutorService pool = null; + private ClusterConnection connection = null; + private RpcRetryingCallerFactory rpcRetryingCallerFactory = null; + private RpcControllerFactory rpcControllerFactory = null; + private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator bufferedMutator) + throws RetriesExhaustedWithDetailsException { + throw exception; + } + }; + + public BufferedMutatorBuilder(TableName tableName) { + this.tableName = tableName; + } + + public TableName getTableName() { + return tableName; + } + + /** + * Override the write buffer size specified by the provided {@link ClusterConnection}'s + * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key + * {@code hbase.client.write.buffer}. + */ + public long getWriteBufferSize() { + if (writeBufferSize != UNSET) return writeBufferSize; + return connection != null ? + connection.getConfiguration().getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT) : + writeBufferSize; + } + + public BufferedMutatorBuilder writeBufferSize(long writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + /** + * Override the maximum key-value size specified by the provided {@link ClusterConnection}'s + * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key + * {@code hbase.client.keyvalue.maxsize}. + */ + public int getMaxKeyValueSize() { + if (maxKeyValueSize != UNSET) return maxKeyValueSize; + return connection != null ? + connection.getConfiguration().getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT) : + maxKeyValueSize; + } + + public BufferedMutatorBuilder maxKeyValueSize(int maxKeyValueSize) { + this.maxKeyValueSize = maxKeyValueSize; + return this; + } + + public ExecutorService getPool() { + return pool; + } + + public BufferedMutatorBuilder pool(ExecutorService pool) { + this.pool = pool; + return this; + } + + public BufferedMutator.ExceptionListener getListener() { + return listener; + } + + public BufferedMutatorBuilder listener(BufferedMutator.ExceptionListener listener) { + this.listener = listener; + return this; + } + + /* + * the remaining getter/setters are for HTable implementation details and are intentionally + * package-private + */ + + ClusterConnection getConnection() { + return connection; + } + + BufferedMutatorBuilder connection(ClusterConnection connection) { + this.connection = connection; + return this; + } + + RpcRetryingCallerFactory getRpcRetryingCallerFactory() { + return rpcRetryingCallerFactory; + } + + BufferedMutatorBuilder rpcRetryingCallerFactory(RpcRetryingCallerFactory factory) { + this.rpcRetryingCallerFactory = factory; + return this; + } + + RpcControllerFactory getRpcControllerFactory() { + return rpcControllerFactory; + } + + BufferedMutatorBuilder rpcControllerFactory(RpcControllerFactory factory) { + this.rpcControllerFactory = factory; + return this; + } + + public BufferedMutator build() { + return new BufferedMutatorImpl(this); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java new file mode 100644 index 0000000..67ac3b2 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -0,0 +1,254 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +/** + *

+ * Used to communicate with a single HBase table similar to {@link HTable} + * but meant for batched, potentially asynchronous puts. Obtain an instance from + * a {@link Connection} and call {@link #close()} afterwards. + *

+ * + * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class BufferedMutatorImpl implements BufferedMutator { + + private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); + + private final ExceptionListener listener; + + protected ClusterConnection connection; // non-final so can be overridden in test + private final TableName tableName; + private volatile Configuration conf; + private List writeAsyncBuffer = new LinkedList<>(); + private long writeBufferSize; + private final int maxKeyValueSize; + protected long currentWriteBufferSize = 0; + private boolean closed = false; + private final ExecutorService pool; + protected AsyncProcess ap; // non-final so can be overridden in test + + BufferedMutatorImpl(BufferedMutatorBuilder builder) { + if (builder.getConnection() == null || builder.getConnection().isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + + this.tableName = builder.getTableName(); + this.connection = builder.getConnection(); + this.conf = connection.getConfiguration(); + this.pool = builder.getPool(); + this.listener = builder.getListener(); + + this.writeBufferSize = builder.getWriteBufferSize(); + this.maxKeyValueSize = builder.getMaxKeyValueSize(); + + // puts need to track errors globally due to how the APIs currently work. + ap = new AsyncProcess(connection, conf, pool, builder.getRpcRetryingCallerFactory(), + true, builder.getRpcControllerFactory()); + } + + @Override + public TableName getName() { + return tableName; + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public synchronized void mutate(Mutation m) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + doMutate(m); + } + + @Override + public synchronized void mutate(List ms) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + for (Mutation m : ms) { + doMutate(m); + } + } + + /** + * Add the put to the buffer. If the buffer is already too large, sends the buffer to the + * cluster. + * + * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. + * @throws InterruptedIOException if we were interrupted. + */ + private void doMutate(Mutation m) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + if (closed) { + throw new IllegalStateException("Cannot put when the BufferedMutator is closed."); + } + if (!(m instanceof Put) && !(m instanceof Delete)) { + throw new IllegalArgumentException("Pass a Delete or a Put"); + } + + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.add(m); + backgroundFlushCommits(true); + } + + if (m instanceof Put) { + validatePut((Put) m); + } + + currentWriteBufferSize += m.heapSize(); + writeAsyncBuffer.add(m); + + while (currentWriteBufferSize > writeBufferSize) { + backgroundFlushCommits(false); + } + } + + // validate for well-formedness + public void validatePut(final Put put) throws IllegalArgumentException { + HTable.validatePut(put, maxKeyValueSize); + } + + @Override + public synchronized void close() throws IOException { + if (this.closed) { + return; + } + try { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + this.pool.shutdown(); + boolean terminated = false; + int loopCnt = 0; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + loopCnt += 1; + if (loopCnt >= 10) { + LOG.warn("close() failed to terminate pool after 10 minutes. Abandoning pool."); + break; + } + } while (!terminated); + } catch (InterruptedException e) { + LOG.warn("waitForTermination interrupted"); + } finally { + this.closed = true; + } + } + + @Override + public synchronized void flush() throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + } + + /** + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If + * the is an error (max retried reach from a previous flush or bad operation), it tries to send + * all operations in the buffer and sends an exception. + * + * @param synchronous - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + try { + if (!synchronous) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + if (ap.hasError()) { + LOG.debug(tableName + ": One or more of the operations have failed -" + + " waiting for all operation in progress to finish (successfully or not)"); + } + } + if (synchronous || ap.hasError()) { + while (!writeAsyncBuffer.isEmpty()) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + } + RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + if (error != null) { + if (listener == null) { + throw error; + } else { + this.listener.onException(error, this); + } + } + } + } finally { + currentWriteBufferSize = 0; + for (Row mut : writeAsyncBuffer) { + if (mut instanceof Mutation) { + currentWriteBufferSize += ((Mutation) mut).heapSize(); + } + } + } + } + + /** + * This is used for legacy purposes in {@link HTable#setWriteBufferSize(long)} only. This ought + * not be called for production uses. + * @deprecated Going away when we drop public support for {@link HTableInterface}. + */ + @Deprecated + public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, + InterruptedIOException { + this.writeBufferSize = writeBufferSize; + if (currentWriteBufferSize > writeBufferSize) { + flush(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getWriteBufferSize() { + return this.writeBufferSize; + } + + /** + * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beÓ + * called from production uses. + * @deprecated Going away when we drop public support for {@link HTableInterface}. +Ó */ + @Deprecated + public List getWriteBuffer() { + return this.writeAsyncBuffer; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index 92b3f04..2053560 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -99,6 +99,37 @@ public interface Connection extends Abortable, Closeable { Table getTable(TableName tableName, ExecutorService pool) throws IOException; /** + *

+ * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The + * {@link BufferedMutator} returned by this method is thread-safe. This BufferedMutator will + * use the Connection's ExecutorService. This object can be used for long lived operations. + *

+ *

+ * The caller is responsible for calling {@link BufferedMutator#close()} on + * the returned {@link BufferedMutator} instance. + *

+ *

+ * This accessor will use the connection's ExecutorService and will throw an + * exception in the main thread when an asynchronous exception occurs. + * + * @param tableName the name of the table + * + * @return a {@link BufferedMutator} for the supplied tableName. + */ + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException; + + /** + * Retrieve a {@link BufferedMutator} for performing client-side buffering of writes. The + * {@link BufferedMutator} returned by this method is thread-safe. This object can be used for + * long lived table operations. The caller is responsible for calling + * {@link BufferedMutator#close()} on the returned {@link BufferedMutator} instance. + * + * @param params details on how to instantiate the {@code BufferedMutator}. + * @return a {@link BufferedMutator} for the supplied tableName. + */ + public BufferedMutator getBufferedMutator(BufferedMutatorBuilder params) throws IOException; + + /** * Retrieve a RegionLocator implementation to inspect region information on a table. The returned * RegionLocator is not thread-safe, so a new instance should be created for each using thread. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 7c8f8d8..2a80486 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -109,6 +109,17 @@ abstract class ConnectionAdapter implements ClusterConnection { } @Override + public BufferedMutator getBufferedMutator(BufferedMutatorBuilder params) + throws IOException { + return wrappedConnection.getBufferedMutator(params); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return wrappedConnection.getBufferedMutator(tableName); + } + + @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { return wrappedConnection.getRegionLocator(tableName); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 88e045b..52ed89e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -67,8 +67,8 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -705,6 +705,28 @@ class ConnectionManager { } @Override + public BufferedMutator getBufferedMutator(BufferedMutatorBuilder builder) { + if (builder.getTableName() == null) { + throw new IllegalArgumentException("TableName cannot be null."); + } + if (builder.getPool() == null) { + builder.pool(HTable.getDefaultExecutor(getConfiguration())); + } + if (builder.getWriteBufferSize() <= 0) { + builder.writeBufferSize(tableConfig.getWriteBufferSize()); + } + builder.connection(this); + builder.rpcRetryingCallerFactory(rpcCallerFactory); + builder.rpcControllerFactory(rpcControllerFactory); + return builder.build(); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) { + return getBufferedMutator(new BufferedMutatorBuilder(tableName)); + } + + @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { if (managed) { throw new IOException("The connection has to be unmanaged."); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 7c433e0..d827c61 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -116,10 +115,8 @@ public class HTable implements HTableInterface, RegionLocator { private final TableName tableName; private volatile Configuration configuration; private TableConfiguration tableConfiguration; - protected List writeAsyncBuffer = new LinkedList(); - private long writeBufferSize; + protected BufferedMutatorImpl mutator; private boolean autoFlush = true; - protected long currentWriteBufferSize = 0 ; private boolean closed = false; protected int scannerCaching; private ExecutorService pool; // For Multi & Scan @@ -128,8 +125,6 @@ public class HTable implements HTableInterface, RegionLocator { private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; - /** The Async process for puts with autoflush set to false or multiputs */ - protected AsyncProcess ap; /** The Async process for batch */ protected AsyncProcess multiAp; private RpcRetryingCallerFactory rpcCallerFactory; @@ -325,14 +320,20 @@ public class HTable implements HTableInterface, RegionLocator { } /** - * For internal testing. + * For internal testing. Uses Connection provided in {@code builder}. + * @throws IOException */ @VisibleForTesting - protected HTable() { + protected HTable(BufferedMutatorBuilder builder) throws IOException { + connection = builder.getConnection(); tableName = null; tableConfiguration = new TableConfiguration(); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; + this.mutator = (BufferedMutatorImpl) builder + .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize()) + .writeBufferSize(tableConfiguration.getWriteBufferSize()) + .build(); } /** @@ -352,9 +353,6 @@ public class HTable implements HTableInterface, RegionLocator { this.operationTimeout = tableName.isSystemTable() ? tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.writeBufferSize = tableConfiguration.getWriteBufferSize(); - this.autoFlush = true; - this.currentWriteBufferSize = 0; this.scannerCaching = tableConfiguration.getScannerCaching(); if (this.rpcCallerFactory == null) { @@ -365,7 +363,6 @@ public class HTable implements HTableInterface, RegionLocator { } // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); multiAp = this.connection.getAsyncProcess(); this.closed = false; @@ -542,7 +539,7 @@ public class HTable implements HTableInterface, RegionLocator { */ @Deprecated public List getWriteBuffer() { - return writeAsyncBuffer; + return mutator == null ? null : mutator.getWriteBuffer(); } /** @@ -663,6 +660,8 @@ public class HTable implements HTableInterface, RegionLocator { * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs + * + * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ @Override public List getAllRegionLocations() throws IOException { @@ -1023,11 +1022,11 @@ public class HTable implements HTableInterface, RegionLocator { /** * {@inheritDoc} + * @throws IOException */ @Override - public void put(final Put put) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doPut(put); + public void put(final Put put) throws IOException { + getBufferedMutator().mutate(put); if (autoFlush) { flushCommits(); } @@ -1035,82 +1034,16 @@ public class HTable implements HTableInterface, RegionLocator { /** * {@inheritDoc} + * @throws IOException */ @Override - public void put(final List puts) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Put put : puts) { - doPut(put); - } + public void put(final List puts) throws IOException { + getBufferedMutator().mutate(puts); if (autoFlush) { flushCommits(); } } - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(put); - backgroundFlushCommits(true); - } - - validatePut(put); - - currentWriteBufferSize += put.heapSize(); - writeAsyncBuffer.add(put); - - while (currentWriteBufferSize > writeBufferSize) { - backgroundFlushCommits(false); - } - } - - - /** - * Send the operations in the buffer to the servers. Does not wait for the server's answer. - * If the is an error (max retried reach from a previous flush or bad operation), it tries to - * send all operations in the buffer and sends an exception. - * @param synchronous - if true, sends all the writes and wait for all of them to finish before - * returning. - */ - private void backgroundFlushCommits(boolean synchronous) throws - InterruptedIOException, RetriesExhaustedWithDetailsException { - - try { - if (!synchronous) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - if (ap.hasError()) { - LOG.debug(tableName + ": One or more of the operations have failed -" + - " waiting for all operation in progress to finish (successfully or not)"); - } - } - if (synchronous || ap.hasError()) { - while (!writeAsyncBuffer.isEmpty()) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); - if (error != null) { - throw error; - } - } - } finally { - currentWriteBufferSize = 0; - for (Row mut : writeAsyncBuffer) { - if (mut instanceof Mutation) { - currentWriteBufferSize += ((Mutation) mut).heapSize(); - } - } - } - } - /** * {@inheritDoc} */ @@ -1287,7 +1220,7 @@ public class HTable implements HTableInterface, RegionLocator { controller.setCallTimeout(callTimeout); try { MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + getLocation().getRegionInfo().getRegionName(), row, family, qualifier, new BinaryComparator(value), CompareType.EQUAL, put); MutateResponse response = getStub().mutate(controller, request); return Boolean.valueOf(response.getProcessed()); @@ -1474,12 +1407,11 @@ public class HTable implements HTableInterface, RegionLocator { /** * {@inheritDoc} + * @throws IOException */ @Override - public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); + public void flushCommits() throws IOException { + getBufferedMutator().flush(); } /** @@ -1599,7 +1531,11 @@ public class HTable implements HTableInterface, RegionLocator { */ @Override public long getWriteBufferSize() { - return writeBufferSize; + if (mutator == null) { + return tableConfiguration.getWriteBufferSize(); + } else { + return mutator.getWriteBufferSize(); + } } /** @@ -1612,10 +1548,8 @@ public class HTable implements HTableInterface, RegionLocator { */ @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { - this.writeBufferSize = writeBufferSize; - if(currentWriteBufferSize > writeBufferSize) { - flushCommits(); - } + getBufferedMutator(); + mutator.setWriteBufferSize(writeBufferSize); } /** @@ -1928,4 +1862,17 @@ public class HTable implements HTableInterface, RegionLocator { callbackErrorServers); } } + + @VisibleForTesting + BufferedMutator getBufferedMutator() throws IOException { + if (mutator == null) { + this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator( + new BufferedMutatorBuilder(tableName) + .pool(pool) + .writeBufferSize(tableConfiguration.getWriteBufferSize()) + .maxKeyValueSize(tableConfiguration.getMaxKeyValueSize()) + ); + } + return mutator; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java index 911e034..ffd5069 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java @@ -67,8 +67,8 @@ public interface HTableInterface extends Table { * Whether or not to enable 'auto-flush'. * @deprecated in 0.96. When called with setAutoFlush(false), this function also * set clearBufferOnFail to true, which is unexpected but kept for historical reasons. - * Replace it with setAutoFlush(false, false) if this is exactly what you want, or by - * {@link #setAutoFlushTo(boolean)} for all other cases. + * Replace it with setAutoFlush(false, false) if this is exactly what you want, though + * this is the method you want for most cases. */ @Deprecated void setAutoFlush(boolean autoFlush); @@ -105,13 +105,69 @@ public interface HTableInterface extends Table { * the value of this parameter is ignored and clearBufferOnFail is set to true. * Setting clearBufferOnFail to false is deprecated since 0.96. * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use - * {@link #setAutoFlushTo(boolean)}} instead. - * @see #flushCommits + * {@link #setAutoFlush(boolean)}} instead. + * @see BufferedMutator#flush() */ @Deprecated void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail); /** + * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail}. + * @deprecated in 0.99 since setting clearBufferOnFail is deprecated. Use + * {@link #setAutoFlush(boolean)} instead, or better still, move on to {@link BufferedMutator} + */ + @Deprecated + void setAutoFlushTo(boolean autoFlush); + + /** + * Tells whether or not 'auto-flush' is turned on. + * + * @return {@code true} if 'auto-flush' is enabled (default), meaning + * {@link Put} operations don't get buffered/delayed and are immediately + * executed. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} + */ + @Deprecated + boolean isAutoFlush(); + + /** + * Executes all the buffered {@link Put} operations. + *

+ * This method gets called once automatically for every {@link Put} or batch + * of {@link Put}s (when put(List) is used) when + * {@link #isAutoFlush} is {@code true}. + * @throws IOException if a remote or network exception occurs. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#flush()} + */ + @Deprecated + void flushCommits() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter + * {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator#getWriteBufferSize()} + */ + @Deprecated + long getWriteBufferSize(); + + /** + * Sets the size of the buffer in bytes. + *

+ * If the new size is less than the current amount of data in the + * write buffer, the buffer gets flushed. + * @param writeBufferSize The new write buffer size, in bytes. + * @throws IOException if a remote or network exception occurs. + * @deprecated as of 1.0.0. Replaced by {@link BufferedMutator} and + * {@link BufferedMutatorBuilder#writeBufferSize(long)} + */ + @Deprecated + void setWriteBufferSize(long writeBufferSize) throws IOException; + + + /** * Return the row that matches row exactly, * or the one that immediately precedes it. * diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 07e4c08..dae8a25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -219,9 +219,7 @@ public interface Table extends Closeable { /** * Puts some data in the table. - *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. + * * @param put The data to put. * @throws IOException if a remote or network exception occurs. * @since 0.20.0 @@ -231,9 +229,6 @@ public interface Table extends Closeable { /** * Puts some data in the table, in batch. *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. - *

* This can be used for group commit, or for submitting user defined * batches. The writeBuffer will be periodically inspected while the List * is processed, so depending on the List size the writeBuffer may flush @@ -498,30 +493,6 @@ public interface Table extends Closeable { final Batch.Callback callback) throws ServiceException, Throwable; /** - * Tells whether or not 'auto-flush' is turned on. - * - * @return {@code true} if 'auto-flush' is enabled (default), meaning - * {@link Put} operations don't get buffered/delayed and are immediately - * executed. - */ - boolean isAutoFlush(); - - /** - * Executes all the buffered {@link Put} operations. - *

- * This method gets called once automatically for every {@link Put} or batch - * of {@link Put}s (when put(List) is used) when - * {@link #isAutoFlush} is {@code true}. - * @throws IOException if a remote or network exception occurs. - */ - void flushCommits() throws IOException; - - /** - * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail} - */ - void setAutoFlushTo(boolean autoFlush); - - /** * Returns the maximum size in bytes of the write buffer for this HTable. *

* The default value comes from the configuration parameter @@ -540,7 +511,6 @@ public interface Table extends Closeable { */ void setWriteBufferSize(long writeBufferSize) throws IOException; - /** * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java index 6176a0c..70ad179 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableConfiguration.java @@ -28,20 +28,18 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class TableConfiguration { - private final long writeBufferSize; + public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer"; + public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152; + public static final String MAX_KEYVALUE_SIZE_KEY = "hbase.client.keyvalue.maxsize"; + public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1; + private final long writeBufferSize; private final int metaOperationTimeout; - private final int operationTimeout; - private final int scannerCaching; - private final int primaryCallTimeoutMicroSecond; - private final int replicaCallTimeoutMicroSecondScan; - private final int retries; - private final int maxKeyValueSize; /** @@ -49,7 +47,7 @@ public class TableConfiguration { * @param conf Configuration object */ TableConfiguration(Configuration conf) { - this.writeBufferSize = conf.getLong("hbase.client.write.buffer", 2097152); + this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); this.metaOperationTimeout = conf.getInt( HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, @@ -70,7 +68,7 @@ public class TableConfiguration { this.retries = conf.getInt( HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); + this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT); } /** @@ -80,14 +78,14 @@ public class TableConfiguration { */ @VisibleForTesting protected TableConfiguration() { - this.writeBufferSize = 2097152; + this.writeBufferSize = WRITE_BUFFER_SIZE_DEFAULT; this.metaOperationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; this.scannerCaching = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING; this.primaryCallTimeoutMicroSecond = 10000; this.replicaCallTimeoutMicroSecondScan = 1000000; this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER; - this.maxKeyValueSize = -1; + this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; } public long getWriteBufferSize() { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 97c3c37..0223f99 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -154,8 +154,8 @@ public class TestAsyncProcess { new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); } - public MyAsyncProcess( - ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) { + public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, + @SuppressWarnings("unused") boolean dummy) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) { @Override @@ -643,26 +643,29 @@ public class TestAsyncProcess { NonceGenerator ng = Mockito.mock(NonceGenerator.class); Mockito.when(ng.getNonceGroup()).thenReturn(HConstants.NO_NONCE); Mockito.when(hc.getNonceGenerator()).thenReturn(ng); + Mockito.when(hc.getConfiguration()).thenReturn(conf); return hc; } @Test public void testHTablePutSuccess() throws Exception { - HTable ht = Mockito.mock(HTable.class); + BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); ht.ap = new MyAsyncProcess(createHConnection(), conf, true); Put put = createPut(1, true); Assert.assertEquals(0, ht.getWriteBufferSize()); - ht.put(put); + ht.mutate(put); Assert.assertEquals(0, ht.getWriteBufferSize()); } private void doHTableFailedPut(boolean bufferOn) throws Exception { - HTable ht = new HTable(); - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; - ht.setAutoFlushTo(true); + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(DUMMY_TABLE) + .connection(createHConnection()); + HTable ht = new HTable(builder); + MyAsyncProcess ap = new MyAsyncProcess(builder.getConnection(), conf, true); + ht.mutator.ap = ap; + ht.mutator.connection = builder.getConnection(); if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); } else { @@ -671,7 +674,7 @@ public class TestAsyncProcess { Put put = createPut(1, false); - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); try { ht.put(put); if (bufferOn) { @@ -680,7 +683,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -707,16 +710,15 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { - HTable ht = new HTable(); - MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; - // This is deprecated method. Using it here only because the new HTable above is a bit of a - // perversion skirting a bunch of setup. Fix the HTable test-only constructor to do more. - ht.setAutoFlush(false, true); - ht.setWriteBufferSize(0); + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(DUMMY_TABLE) + .connection(createHConnection()); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) builder.build(); + MyAsyncProcess ap = new MyAsyncProcess(builder.getConnection(), conf, true); + mutator.ap = ap; + mutator.setWriteBufferSize(0); Put p = createPut(1, false); - ht.put(p); + mutator.mutate(p); ap.waitUntilDone(); // Let's do all the retries. @@ -726,13 +728,13 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, ht.writeAsyncBuffer.size()); + Assert.assertEquals(0, mutator.getWriteBuffer().size()); try { - ht.put(p); + mutator.mutate(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size()); + Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size()); } @@ -763,9 +765,10 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { - HTable ht = new HTable(); - ht.connection = new MyConnectionImpl(conf); - ht.multiAp = new MyAsyncProcess(ht.connection, conf, false); + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(DUMMY_TABLE) + .connection(new MyConnectionImpl(conf)); + HTable ht = new HTable(builder); + ht.multiAp = new MyAsyncProcess(builder.getConnection(), conf, false); List puts = new ArrayList(); puts.add(createPut(1, true)); @@ -794,26 +797,24 @@ public class TestAsyncProcess { @Test public void testErrorsServers() throws IOException { - HTable ht = new HTable(); Configuration configuration = new Configuration(conf); + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(DUMMY_TABLE) + .connection(new MyConnectionImpl(configuration)); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) builder.build(); configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true); - // set default writeBufferSize - ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152)); - ht.connection = new MyConnectionImpl(configuration); - MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true); - ht.ap = ap; + MyAsyncProcess ap = new MyAsyncProcess(builder.getConnection(), configuration, true); + mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); - Assert.assertTrue(ht.ap.serverTrackerTimeout > 200); - ht.ap.serverTrackerTimeout = 1; + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); + mutator.ap.serverTrackerTimeout = 1; Put p = createPut(1, false); - ht.setAutoFlushTo(false); - ht.put(p); + mutator.mutate(p); try { - ht.flushCommits(); + mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { } @@ -823,19 +824,19 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { - HTable ht = new HTable(); - ht.connection = new MyConnectionImpl(conf); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf); - ht.ap = ap; + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(DUMMY_TABLE) + .connection(new MyConnectionImpl(conf)); + BufferedMutatorImpl mutator = (BufferedMutatorImpl) builder.build(); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(builder.getConnection(), conf); + mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); Put p = createPut(1, true); - ht.setAutoFlushTo(false); - ht.put(p); + mutator.mutate(p); try { - ht.flushCommits(); + mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { } @@ -862,13 +863,14 @@ public class TestAsyncProcess { gets.add(get); } - HTable ht = new HTable(); MyConnectionImpl2 con = new MyConnectionImpl2(hrls); - ht.connection = con; + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(DUMMY_TABLE) + .connection(con); + HTable ht = new HTable(builder); MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); ht.multiAp = ap; - ht.batch(gets); + ht.batch(gets, new Object[gets.size()]); Assert.assertEquals(ap.nbActions.get(), NB_REGS); Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index b1ef5b9..2d50c1b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -708,36 +708,47 @@ public class TestClientNoCluster extends Configured implements Tool { * @throws IOException */ static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException { - Table table = sharedConnection.getTable(TableName.valueOf(BIG_USER_TABLE)); - table.setAutoFlushTo(false); long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); long startTime = System.currentTimeMillis(); final int printInterval = 100000; Random rd = new Random(id); boolean get = c.getBoolean("hbase.test.do.gets", false); - try { - Stopwatch stopWatch = new Stopwatch(); - stopWatch.start(); - for (int i = 0; i < namespaceSpan; i++) { - byte [] b = format(rd.nextLong()); - if (get){ + TableName tableName = TableName.valueOf(BIG_USER_TABLE); + if (get) { + try (Table table = sharedConnection.getTable(tableName)){ + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(rd.nextLong()); Get g = new Get(b); table.get(g); - } else { + if (i % printInterval == 0) { + LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } + } + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); + } + } else { + try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(rd.nextLong()); Put p = new Put(b); p.add(HConstants.CATALOG_FAMILY, b, b); - table.put(p); + mutator.mutate(p); + if (i % printInterval == 0) { + LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } } - if (i % printInterval == 0) { - LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); - stopWatch.reset(); - stopWatch.start(); + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); } - } - LOG.info("Finished a cycle putting " + namespaceSpan + " in " + - (System.currentTimeMillis() - startTime) + "ms"); - } finally { - table.close(); } } diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java new file mode 100644 index 0000000..ab03a4f --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedMutatorExample.java @@ -0,0 +1,119 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An example of using the {@link BufferedMutator} interface. + */ +public class BufferedMutatorExample extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(BufferedMutatorExample.class); + + private static final int POOL_SIZE = 10; + private static final int TASK_COUNT = 100; + private static final TableName TABLE = TableName.valueOf("foo"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + + @Override + public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException { + + /** a callback invoked when an asynchronous write fails. */ + final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) { + for (int i = 0; i < e.getNumExceptions(); i++) { + LOG.info("Failed to sent put " + e.getRow(i) + "."); + } + } + }; + BufferedMutatorBuilder builder = new BufferedMutatorBuilder(TABLE) + .listener(listener); + + // + // step 1: create a single Connection and a BufferedMutator, shared by all worker threads. + // + try (final Connection conn = ConnectionFactory.createConnection(getConf()); + final BufferedMutator mutator = conn.getBufferedMutator(builder)) { + + /** worker pool that operates on BufferedTable instances */ + final ExecutorService workerPool = Executors.newFixedThreadPool(POOL_SIZE); + List> futures = new ArrayList<>(TASK_COUNT); + + for (int i = 0; i < TASK_COUNT; i++) { + futures.add(workerPool.submit(new Callable() { + @Override + public Void call() throws Exception { + // + // step 2: each worker sends edits to the shared BufferedMutator instance. They all use + // the same backing buffer, call-back "listener", and RPC executor pool. + // + Put p = new Put(Bytes.toBytes("someRow")); + p.add(FAMILY, Bytes.toBytes("someQualifier"), Bytes.toBytes("some value")); + mutator.mutate(p); + // do work... maybe you want to call mutator.flush() after many edits to ensure any of + // this worker's edits are sent before exiting the Callable + return null; + } + })); + } + + // + // step 3: clean up the worker pool, shut down. + // + for (Future f : futures) { + f.get(5, TimeUnit.MINUTES); + } + workerPool.shutdown(); + } catch (IOException e) { + // exception while creating/destroying Connection or BufferedMutator + LOG.info("exception while creating/destroying Connection or BufferedMutator", e); + } // BufferedMutator.close() ensures all work is flushed. Could be the custom listener is + // invoked from here. + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new BufferedMutatorExample(), args); + } +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 0a62966..94eadec 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -53,6 +53,10 @@ import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; @@ -340,7 +344,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { byte[] id; long count = 0; int i; - Table table; + BufferedMutator mutator; + Connection connection; long numNodes; long wrap; int width; @@ -349,7 +354,8 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { protected void setup(Context context) throws IOException, InterruptedException { id = Bytes.toBytes("Job: "+context.getJobID() + " Task: " + context.getTaskAttemptID()); Configuration conf = context.getConfiguration(); - instantiateHTable(conf); + connection = ConnectionFactory.createConnection(conf); + instantiateHTable(); this.width = context.getConfiguration().getInt(GENERATOR_WIDTH_KEY, WIDTH_DEFAULT); current = new byte[this.width][]; int wrapMultiplier = context.getConfiguration().getInt(GENERATOR_WRAP_KEY, WRAP_DEFAULT); @@ -361,15 +367,16 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } - protected void instantiateHTable(Configuration conf) throws IOException { - table = new HTable(conf, getTableName(conf)); - table.setAutoFlushTo(false); - table.setWriteBufferSize(4 * 1024 * 1024); + protected void instantiateHTable() throws IOException { + mutator = connection.getBufferedMutator( + new BufferedMutatorBuilder(getTableName(connection.getConfiguration())) + .writeBufferSize(4 * 1024 * 1024)); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException { - table.close(); + mutator.close(); + connection.close(); } @Override @@ -419,7 +426,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (id != null) { put.add(FAMILY_NAME, COLUMN_CLIENT, id); } - table.put(put); + mutator.mutate(put); if (i % 1000 == 0) { // Tickle progress every so often else maprunner will think us hung @@ -427,7 +434,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } - table.flushCommits(); + mutator.flush(); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java index e702805..c68ce4d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -182,10 +181,9 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB } @Override - protected void instantiateHTable(Configuration conf) throws IOException { + protected void instantiateHTable() throws IOException { for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { - Table table = new HTable(conf, getTableName(i)); - table.setAutoFlushTo(true); + Table table = connection.getTable(getTableName(i)); //table.setWriteBufferSize(4 * 1024 * 1024); this.tables[i] = table; } @@ -231,9 +229,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB output.progress(); } } - for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { - tables[j].flushCommits(); - } } } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java index 8a7e9f1..48ccda1 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java @@ -35,11 +35,13 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -111,8 +113,6 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase { private static final int SCANNER_CACHING = 500; - protected IntegrationTestingUtility util; - private String toRun = null; private enum Counters { @@ -165,7 +165,8 @@ public void cleanUpCluster() throws Exception { extends Mapper { protected long recordsToWrite; - protected Table table; + protected Connection connection; + protected BufferedMutator mutator; protected Configuration conf; protected int numBackReferencesPerRow; protected String shortTaskId; @@ -180,9 +181,10 @@ public void cleanUpCluster() throws Exception { recordsToWrite = conf.getLong(NUM_TO_WRITE_KEY, NUM_TO_WRITE_DEFAULT); String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT); numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); - table = new HTable(conf, TableName.valueOf(tableName)); - table.setWriteBufferSize(4*1024*1024); - table.setAutoFlushTo(false); + this.connection = ConnectionFactory.createConnection(conf); + mutator = connection.getBufferedMutator( + new BufferedMutatorBuilder(TableName.valueOf(tableName)) + .writeBufferSize(4 * 1024 * 1024)); String taskId = conf.get("mapreduce.task.attempt.id"); Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); @@ -197,8 +199,8 @@ public void cleanUpCluster() throws Exception { @Override public void cleanup(Context context) throws IOException { - table.flushCommits(); - table.close(); + mutator.close(); + connection.close(); } @Override @@ -230,7 +232,7 @@ public void cleanUpCluster() throws Exception { refsWritten.increment(1); } rowsWritten.increment(1); - table.put(p); + mutator.mutate(p); if (i % 100 == 0) { context.setStatus("Written " + i + "/" + recordsToWrite + " records"); @@ -239,7 +241,7 @@ public void cleanUpCluster() throws Exception { } // End of block, flush all of them before we start writing anything // pointing to these! - table.flushCommits(); + mutator.flush(); } } } @@ -315,7 +317,7 @@ public void cleanUpCluster() throws Exception { NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJobName(TEST_NAME + " Load for " + htd.getTableName()); job.setJarByClass(this.getClass()); setMapperClass(job); @@ -339,7 +341,7 @@ public void cleanUpCluster() throws Exception { protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "verify-output"); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(TEST_NAME + " Verification for " + htd.getTableName()); setJobScannerConf(job); @@ -393,7 +395,7 @@ public void cleanUpCluster() throws Exception { // Only disable and drop if we succeeded to verify - otherwise it's useful // to leave it around for post-mortem - getTestingUtil(getConf()).deleteTable(htd.getName()); + getTestingUtil(getConf()).deleteTable(htd.getTableName()); } public void usage() { @@ -449,15 +451,17 @@ public void cleanUpCluster() throws Exception { HTableDescriptor htd = new HTableDescriptor(table); htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); - Admin admin = new HBaseAdmin(getConf()); - if (doLoad) { - admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); - doLoad(getConf(), htd); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { + if (doLoad) { + admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); + doLoad(getConf(), htd); + } } if (doVerify) { doVerify(getConf(), htd); if (doDelete) { - getTestingUtil(getConf()).deleteTable(htd.getName()); + getTestingUtil(getConf()).deleteTable(htd.getTableName()); } } return 0; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java index ea9b228..e68cb38 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java @@ -176,7 +176,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); p.setCellVisibility(new CellVisibility(exp)); getCounter(expIdx).increment(1); - table.put(p); + mutator.mutate(p); if (i % 100 == 0) { context.setStatus("Written " + i + "/" + recordsToWrite + " records"); @@ -185,7 +185,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT } // End of block, flush all of them before we start writing anything // pointing to these! - table.flushCommits(); + mutator.flush(); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index c1dc5ea..0944586 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -22,11 +22,12 @@ import org.apache.commons.cli.CommandLine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -234,12 +235,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { private LinkedBlockingQueue insertData() throws IOException, InterruptedException { LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); - Table ht = new HTable(util.getConfiguration(), this.tableName); + BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; for (int x = 0; x < 5000; x++) { TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); try { - ht.setAutoFlushTo(false); for (int i = 0; i < 5; i++) { long rk = random.nextLong(); rowKeys.add(rk); @@ -248,7 +248,7 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { random.nextBytes(value); p.add(familyName, Bytes.toBytes(random.nextLong()), value); } - ht.put(p); + ht.mutate(p); } if ((x % 1000) == 0) { admin.flush(tableName); diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java index 5836442..dad5a32 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java @@ -227,7 +227,6 @@ public class RowResource extends ResourceBase { } table = servlet.getTable(tableResource.getName()); table.put(puts); - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); @@ -489,7 +488,6 @@ public class RowResource extends ResourceBase { .type(MIMETYPE_TEXT).entity("Value not Modified" + CRLF) .build(); } - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); @@ -580,7 +578,6 @@ public class RowResource extends ResourceBase { .type(MIMETYPE_TEXT).entity(" Delete check failed." + CRLF) .build(); } - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulDeleteRequests(1); return response.build(); diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java index aaf7d59..ad1ca29 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java @@ -33,10 +33,11 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -137,7 +138,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private int presplitRegions = 0; private boolean useTags = false; private int noOfTags = 1; - private HConnection connection; + private Connection connection; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** @@ -501,7 +502,7 @@ public class PerformanceEvaluation extends Configured implements Tool { value.getRows(), value.getTotalRows(), value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(), - HConnectionManager.createConnection(context.getConfiguration()), status); + ConnectionFactory.createConnection(context.getConfiguration()), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -609,7 +610,7 @@ public class PerformanceEvaluation extends Configured implements Tool { final int preSplitRegions = this.presplitRegions; final boolean useTags = this.useTags; final int numTags = this.noOfTags; - final HConnection connection = HConnectionManager.createConnection(getConf()); + final Connection connection = ConnectionFactory.createConnection(getConf()); for (int i = 0; i < this.N; i++) { final int index = i; Thread t = new Thread ("TestClient-" + i) { @@ -684,7 +685,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); @@ -790,14 +791,14 @@ public class PerformanceEvaluation extends Configured implements Tool { private boolean writeToWAL = true; private boolean useTags = false; private int noOfTags = 0; - private HConnection connection; + private Connection connection; TestOptions() { } TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, - int noOfTags, HConnection connection) { + int noOfTags, Connection connection) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; @@ -838,7 +839,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return writeToWAL; } - public HConnection getConnection() { + public Connection getConnection() { return connection; } @@ -870,13 +871,11 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final int totalRows; private final Status status; protected TableName tableName; - protected Table table; protected volatile Configuration conf; - protected boolean flushCommits; protected boolean writeToWAL; protected boolean useTags; protected int noOfTags; - protected HConnection connection; + protected Connection connection; /** * Note that all subclasses of this class must provide a public contructor @@ -889,9 +888,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.totalRows = options.getTotalRows(); this.status = status; this.tableName = options.getTableName(); - this.table = null; this.conf = conf; - this.flushCommits = options.isFlushCommits(); this.writeToWAL = options.isWriteToWAL(); this.useTags = options.isUseTags(); this.noOfTags = options.getNumTags(); @@ -907,18 +904,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return period == 0? this.perClientRunRows: period; } - void testSetup() throws IOException { - this.table = connection.getTable(tableName); - this.table.setAutoFlushTo(false); - } - - void testTakedown() throws IOException { - if (flushCommits) { - this.table.flushCommits(); - } - table.close(); - } - + abstract void testTakedown() throws IOException; /* * Run test * @return Elapsed time. @@ -936,6 +922,8 @@ public class PerformanceEvaluation extends Configured implements Tool { return (System.nanoTime() - startTime) / 1000000; } + abstract void testSetup() throws IOException; + /** * Provides an extension point for tests that don't want a per row invocation. */ @@ -957,8 +945,45 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException; } - @SuppressWarnings("unused") - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + public TableTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + void testSetup() throws IOException { + this.table = connection.getTable(tableName); + } + + @Override + void testTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + protected boolean flushCommits; + + public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + this.flushCommits = options.isFlushCommits(); + } + + void testSetup() throws IOException { + this.mutator = connection.getBufferedMutator(tableName); + } + + void testTakedown() throws IOException { + if (flushCommits) { + this.mutator.flush(); + } + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -981,7 +1006,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @SuppressWarnings("unused") - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1065,7 +1090,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1085,7 +1110,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1109,11 +1134,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Configuration conf, TestOptions options, Status status) { @@ -1141,7 +1166,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1155,7 +1180,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1180,11 +1205,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Configuration conf, TestOptions options, Status status) { @@ -1268,7 +1293,7 @@ public class PerformanceEvaluation extends Configured implements Tool { long runOneClient(final Class cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, - HConnection connection, final Status status) + Connection connection, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); @@ -1463,7 +1488,7 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } - this.connection = HConnectionManager.createConnection(getConf()); + this.connection = ConnectionFactory.createConnection(getConf()); final String useTags = "--usetags="; if (cmd.startsWith(useTags)) { diff --git a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java index baf9961..121ff65 100644 --- a/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java +++ b/hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java @@ -20,10 +20,10 @@ package org.apache.hadoop.hbase.rest.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.io.IOException; import java.util.ArrayList; @@ -37,12 +37,12 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -99,9 +99,7 @@ public class TestRemoteTable { htd.addFamily(new HColumnDescriptor(COLUMN_2).setMaxVersions(3)); htd.addFamily(new HColumnDescriptor(COLUMN_3).setMaxVersions(3)); admin.createTable(htd); - Table table = null; - try { - table = new HTable(TEST_UTIL.getConfiguration(), TABLE); + try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) { Put put = new Put(ROW_1); put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1); table.put(put); @@ -110,9 +108,6 @@ public class TestRemoteTable { put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2); put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2); table.put(put); - table.flushCommits(); - } finally { - if (null != table) table.close(); } remoteTable = new RemoteHTable( new Client(new Cluster().add("localhost", @@ -349,7 +344,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_2, value2)); Delete delete = new Delete(ROW_3); - delete.deleteColumn(COLUMN_2, QUALIFIER_2); + delete.addColumn(COLUMN_2, QUALIFIER_2); remoteTable.delete(delete); get = new Get(ROW_3); @@ -464,7 +459,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_1, value1)); assertNull(value2); assertTrue(remoteTable.exists(get)); - assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length); + assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length); Delete delete = new Delete(ROW_1); remoteTable.checkAndDelete(ROW_1, COLUMN_1, QUALIFIER_1, VALUE_1, delete); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index c583923..1f84bb4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -55,8 +55,7 @@ import com.google.protobuf.ServiceException; */ public class HTableWrapper implements HTableInterface { - private TableName tableName; - private final Table table; + private final HTableInterface table; private ClusterConnection connection; private final List openTables; @@ -73,7 +72,6 @@ public class HTableWrapper implements HTableInterface { private HTableWrapper(List openTables, TableName tableName, ClusterConnection connection, ExecutorService pool) throws IOException { - this.tableName = tableName; this.table = connection.getTable(tableName, pool); this.connection = connection; this.openTables = openTables; @@ -232,7 +230,7 @@ public class HTableWrapper implements HTableInterface { @Override public byte[] getTableName() { - return tableName.getName(); + return table.getTableName(); } @Override @@ -307,7 +305,7 @@ public class HTableWrapper implements HTableInterface { @Override public void setAutoFlush(boolean autoFlush) { - table.setAutoFlushTo(autoFlush); + table.setAutoFlush(autoFlush); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 563b1f8..6e0d9e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InvalidJobConfException; @@ -52,22 +52,22 @@ public class TableOutputFormat extends FileOutputFormat { - private Table m_table; + private BufferedMutator m_mutator; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the * lifecycle of {@code conn}. */ - public TableRecordWriter(final Table table) throws IOException { - this.m_table = table; + public TableRecordWriter(final BufferedMutator mutator) throws IOException { + this.m_mutator = mutator; } public void close(Reporter reporter) throws IOException { - this.m_table.close(); + this.m_mutator.close(); } public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + m_mutator.mutate(new Put(value)); } } @@ -77,13 +77,12 @@ public class TableOutputFormat extends FileOutputFormat { private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); - Map tables; + Connection connection; + Map mutatorMap = new HashMap<>(); Configuration conf; boolean useWriteAheadLogging; @@ -88,7 +92,6 @@ public class MultiTableOutputFormat extends OutputFormat(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; } @@ -96,24 +99,28 @@ public class MultiTableOutputFormat extends OutputFormat { private Connection connection; - private Table table; + private BufferedMutator mutator; /** * @throws IOException @@ -95,8 +95,7 @@ implements Configurable { public TableRecordWriter() throws IOException { String tableName = conf.get(OUTPUT_TABLE); this.connection = ConnectionFactory.createConnection(conf); - this.table = connection.getTable(TableName.valueOf(tableName)); - this.table.setAutoFlushTo(false); + this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); LOG.info("Created table instance for " + tableName); } /** @@ -104,12 +103,12 @@ implements Configurable { * * @param context The context. * @throws IOException When closing the writer fails. - * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see RecordWriter#close(TaskAttemptContext) */ @Override public void close(TaskAttemptContext context) throws IOException { - table.close(); + mutator.close(); connection.close(); } @@ -119,14 +118,15 @@ implements Configurable { * @param key The key. * @param value The value. * @throws IOException When writing fails. - * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object) + * @see RecordWriter#write(Object, Object) */ @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) table.put(new Put((Put)value)); - else if (value instanceof Delete) table.delete(new Delete((Delete)value)); - else throw new IOException("Pass a Delete or a Put"); + if (!(value instanceof Put) && !(value instanceof Delete)) { + throw new IOException("Pass a Delete or a Put"); + } + mutator.mutate(value); } } @@ -137,11 +137,9 @@ implements Configurable { * @return The newly created writer instance. * @throws IOException When creating the writer fails. * @throws InterruptedException When the jobs is cancelled. - * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override - public RecordWriter getRecordWriter( - TaskAttemptContext context) + public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { return new TableRecordWriter(); } @@ -152,7 +150,7 @@ implements Configurable { * @param context The current context. * @throws IOException When the check fails. * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) + * @see OutputFormat#checkOutputSpecs(JobContext) */ @Override public void checkOutputSpecs(JobContext context) throws IOException, @@ -168,7 +166,7 @@ implements Configurable { * @return The committer. * @throws IOException When creating the committer fails. * @throws InterruptedException When the job is aborted. - * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) + * @see OutputFormat#getOutputCommitter(TaskAttemptContext) */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 53aa05c..a7118a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -52,13 +52,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -393,6 +392,7 @@ public class PerformanceEvaluation extends Configured implements Tool { throws IOException, InterruptedException { final Class cmd = determineCommandClass(opts.cmdName); assert cmd != null; + @SuppressWarnings("unchecked") Future[] threads = new Future[opts.numClientThreads]; RunResult[] results = new RunResult[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, @@ -458,7 +458,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation - " + opts.cmdName); @@ -908,7 +908,6 @@ public class PerformanceEvaluation extends Configured implements Tool { private final Sampler traceSampler; private final SpanReceiverHost receiverHost; protected Connection connection; - protected Table table; private String testName; private Histogram latency; @@ -989,25 +988,25 @@ public class PerformanceEvaluation extends Configured implements Tool { if (!opts.oneCon) { this.connection = ConnectionFactory.createConnection(conf); } - this.table = new HTable(TableName.valueOf(opts.tableName), connection); - this.table.setAutoFlushTo(opts.autoFlush); + onStartup(); latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); } + abstract void onStartup() throws IOException; + void testTakedown() throws IOException { reportLatency(); reportValueSize(); - if (opts.flushCommits) { - this.table.flushCommits(); - } - table.close(); + onTakedown(); if (!opts.oneCon) { connection.close(); } receiverHost.closeReceivers(); } + abstract void onTakedown() throws IOException; + /* * Run test * @return Elapsed time. @@ -1098,7 +1097,43 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException, InterruptedException; } - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + TableTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.table = connection.getTable(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + + BufferedMutatorTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1128,7 +1163,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1216,7 +1251,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { private final Consistency consistency; private ArrayList gets; private Random rd = new Random(); @@ -1270,7 +1305,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1296,11 +1331,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Connection con, TestOptions options, Status status) { @@ -1333,7 +1368,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1349,7 +1384,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1375,11 +1410,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.mutate(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Connection con, TestOptions options, Status status) { @@ -1531,12 +1566,9 @@ public class PerformanceEvaluation extends Configured implements Tool { // Log the configuration we're going to run with. Uses JSON mapper because lazy. It'll do // the TestOptions introspection for us and dump the output in a readable format. LOG.info(cmd.getSimpleName() + " test run options=" + MAPPER.writeValueAsString(opts)); - Admin admin = null; - try { - admin = new HBaseAdmin(getConf()); + try(Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { checkTable(admin, opts); - } finally { - if (admin != null) admin.close(); } if (opts.nomapred) { doLocalClients(opts, getConf()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java index b6502c5..e05a2fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java @@ -101,31 +101,30 @@ public class TestCloneSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); - try { - // enable table and insert data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot0Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); + } + admin.disableTable(tableName); - // take a snapshot - admin.snapshot(snapshotName0, tableName); + // take a snapshot + admin.snapshot(snapshotName0, tableName); - // enable table and insert more data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert more data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot1Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); + } + admin.disableTable(tableName); - // take a snapshot of the updated table - admin.snapshot(snapshotName1, tableName); + // take a snapshot of the updated table + admin.snapshot(snapshotName1, tableName); - // re-enable table - admin.enableTable(tableName); - } finally { - table.close(); - } + // re-enable table + admin.enableTable(tableName); } protected int getNumReplicas() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index bc805fe..f10e88b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -692,15 +692,15 @@ public class TestFromClientSide { public void testMaxKeyValueSize() throws Exception { byte [] TABLE = Bytes.toBytes("testMaxKeyValueSize"); Configuration conf = TEST_UTIL.getConfiguration(); - String oldMaxSize = conf.get("hbase.client.keyvalue.maxsize"); + String oldMaxSize = conf.get(TableConfiguration.MAX_KEYVALUE_SIZE_KEY); Table ht = TEST_UTIL.createTable(TABLE, FAMILY); byte[] value = new byte[4 * 1024 * 1024]; Put put = new Put(ROW); put.add(FAMILY, QUALIFIER, value); ht.put(put); try { - TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 2 * 1024 * 1024); - TABLE = Bytes.toBytes("testMaxKeyValueSize2"); + TEST_UTIL.getConfiguration().setInt( + TableConfiguration.MAX_KEYVALUE_SIZE_KEY, 2 * 1024 * 1024); // Create new table so we pick up the change in Configuration. try (Connection connection = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { @@ -712,7 +712,7 @@ public class TestFromClientSide { } fail("Inserting a too large KeyValue worked, should throw exception"); } catch(Exception e) {} - conf.set("hbase.client.keyvalue.maxsize", oldMaxSize); + conf.set(TableConfiguration.MAX_KEYVALUE_SIZE_KEY, oldMaxSize); } @Test @@ -3906,7 +3906,7 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"), new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY}); - table.setAutoFlushTo(false); + table.setAutoFlush(false); ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { byte[] row = Bytes.toBytes("row" + i); @@ -3937,6 +3937,7 @@ public class TestFromClientSide { Result row : scanner) nbRows++; assertEquals(NB_BATCH_ROWS * 10, nbRows); + table.close(); } @Test @@ -3947,7 +3948,6 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"), new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY }); - table.setAutoFlushTo(false); table.setWriteBufferSize(10); ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { @@ -3959,8 +3959,6 @@ public class TestFromClientSide { } table.put(rowsUpdate); - table.flushCommits(); - Scan scan = new Scan(); scan.addFamily(CONTENTS_FAMILY); ResultScanner scanner = table.getScanner(scan); @@ -4149,6 +4147,7 @@ public class TestFromClientSide { HBaseAdmin ha = new HBaseAdmin(t.getConnection()); assertTrue(ha.tableExists(tableName)); assertTrue(t.get(new Get(ROW)).isEmpty()); + ha.close(); } /** @@ -4162,9 +4161,10 @@ public class TestFromClientSide { final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect"); HTable t = createUnmangedHConnectionHTable(tableName); Connection conn = t.getConnection(); - HBaseAdmin ha = new HBaseAdmin(conn); - assertTrue(ha.tableExists(tableName)); - assertTrue(t.get(new Get(ROW)).isEmpty()); + try (HBaseAdmin ha = new HBaseAdmin(conn)) { + assertTrue(ha.tableExists(tableName)); + assertTrue(t.get(new Get(ROW)).isEmpty()); + } // stop the master MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); @@ -4177,9 +4177,10 @@ public class TestFromClientSide { // test that the same unmanaged connection works with a new // HBaseAdmin and can connect to the new master; - HBaseAdmin newAdmin = new HBaseAdmin(conn); - assertTrue(newAdmin.tableExists(tableName)); - assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES); + try (HBaseAdmin newAdmin = new HBaseAdmin(conn)) { + assertTrue(newAdmin.tableExists(tableName)); + assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES); + } } @Test @@ -4276,7 +4277,6 @@ public class TestFromClientSide { new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024); // set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow // in Store.rowAtOrBeforeFromStoreFile - table.setAutoFlush(true); String regionName = table.getRegionLocations().firstKey().getEncodedName(); HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName); @@ -4351,6 +4351,8 @@ public class TestFromClientSide { assertTrue(result.containsColumn(HConstants.CATALOG_FAMILY, null)); assertTrue(Bytes.equals(result.getRow(), forthRow)); assertTrue(Bytes.equals(result.getValue(HConstants.CATALOG_FAMILY, null), four)); + + table.close(); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 428c637..61cb16a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -149,7 +149,7 @@ public class TestMultiParallel { ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); try { try (Table t = connection.getTable(TEST_TABLE, executor)) { - List puts = constructPutRequests(); // creates a Put for every region + List puts = constructPutRequests(); // creates a Put for every region t.batch(puts); HashSet regionservers = new HashSet(); try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { @@ -172,7 +172,7 @@ public class TestMultiParallel { Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // load test data - List puts = constructPutRequests(); + List puts = constructPutRequests(); table.batch(puts); // create a list of gets and run it @@ -261,17 +261,13 @@ public class TestMultiParallel { private void doTestFlushCommits(boolean doAbort) throws Exception { // Load the data LOG.info("get new table"); - Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); - table.setAutoFlushTo(false); + Table table = UTIL.getConnection().getTable(TEST_TABLE); table.setWriteBufferSize(10 * 1024 * 1024); LOG.info("constructPutRequests"); - List puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } + List puts = constructPutRequests(); + table.put(puts); LOG.info("puts"); - table.flushCommits(); final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() .size(); assert liveRScount > 0; @@ -290,11 +286,7 @@ public class TestMultiParallel { // try putting more keys after the abort. same key/qual... just validating // no exceptions thrown puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } - - table.flushCommits(); + table.put(puts); } LOG.info("validating loaded data"); @@ -331,7 +323,7 @@ public class TestMultiParallel { LOG.info("test=testBatchWithPut"); Table table = CONNECTION.getTable(TEST_TABLE); // put multiple rows using a batch - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -363,7 +355,7 @@ public class TestMultiParallel { Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -371,7 +363,7 @@ public class TestMultiParallel { List deletes = new ArrayList(); for (int i = 0; i < KEYS.length; i++) { Delete delete = new Delete(KEYS[i]); - delete.deleteFamily(BYTES_FAMILY); + delete.addFamily(BYTES_FAMILY); deletes.add(delete); } results = table.batch(deletes); @@ -392,7 +384,7 @@ public class TestMultiParallel { Table table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -664,8 +656,8 @@ public class TestMultiParallel { } } - private List constructPutRequests() { - List puts = new ArrayList(); + private List constructPutRequests() { + List puts = new ArrayList<>(); for (byte[] k : KEYS) { Put put = new Put(k); put.add(BYTES_FAMILY, QUALIFIER, VALUE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java index 0eec477..d488f33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java @@ -110,11 +110,12 @@ public class TestRestoreSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); // enable table and insert data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot0Rows = TEST_UTIL.countRows(table); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot0Rows = TEST_UTIL.countRows(table); + } admin.disableTable(tableName); // take a snapshot @@ -122,9 +123,10 @@ public class TestRestoreSnapshotFromClient { // enable table and insert more data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot1Rows = TEST_UTIL.countRows(table); - table.close(); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot1Rows = TEST_UTIL.countRows(table); + } } @After @@ -183,7 +185,7 @@ public class TestRestoreSnapshotFromClient { assertEquals(2, table.getTableDescriptor().getFamilies().size()); HTableDescriptor htd = admin.getTableDescriptor(tableName); assertEquals(2, htd.getFamilies().size()); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2); long snapshot2Rows = snapshot1Rows + 500; assertEquals(snapshot2Rows, TEST_UTIL.countRows(table)); assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 3db2d9f..94a7819 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -130,18 +130,18 @@ public class TestRpcControllerFactory { // change one of the connection properties so we get a new HConnection with our configuration conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1); - Table table = new HTable(conf, name); - table.setAutoFlushTo(false); + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(name); byte[] row = Bytes.toBytes("row"); Put p = new Put(row); p.add(fam1, fam1, Bytes.toBytes("val0")); table.put(p); - table.flushCommits(); + Integer counter = 1; counter = verifyCount(counter); Delete d = new Delete(row); - d.deleteColumn(fam1, fam1); + d.addColumn(fam1, fam1); table.delete(d); counter = verifyCount(counter); @@ -200,4 +200,4 @@ public class TestRpcControllerFactory { assertEquals(0, CountingRpcController.INT_PRIORITY.get()); return counter + 1; } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java index 1c6bf8e..d178ba1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java @@ -175,11 +175,11 @@ public class TestHTableWrapper { private void checkAutoFlush() { boolean initialAutoFlush = hTableInterface.isAutoFlush(); - hTableInterface.setAutoFlushTo(false); + hTableInterface.setAutoFlush(false); assertFalse(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(true); + hTableInterface.setAutoFlush(true); assertTrue(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(initialAutoFlush); + hTableInterface.setAutoFlush(initialAutoFlush); } private void checkBufferSize() throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index 8daafe4..fa4564c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -937,7 +937,6 @@ public class TestDistributedLogSplitting { if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } - ht.setAutoFlushTo(true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); @@ -1607,11 +1606,11 @@ public class TestDistributedLogSplitting { /** * Load table with puts and deletes with expected values so that we can verify later */ - private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException { - t.setAutoFlushTo(false); + private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException { byte[] k = new byte[3]; // add puts + List puts = new ArrayList<>(); for (byte b1 = 'a'; b1 <= 'z'; b1++) { for (byte b2 = 'a'; b2 <= 'z'; b2++) { for (byte b3 = 'a'; b3 <= 'z'; b3++) { @@ -1620,11 +1619,11 @@ public class TestDistributedLogSplitting { k[2] = b3; Put put = new Put(k); put.add(f, column, k); - t.put(put); + puts.add(put); } } } - t.flushCommits(); + t.put(puts); // add deletes for (byte b3 = 'a'; b3 <= 'z'; b3++) { k[0] = 'a'; @@ -1633,7 +1632,6 @@ public class TestDistributedLogSplitting { Delete del = new Delete(k); t.delete(del); } - t.flushCommits(); } private void waitForCounter(AtomicLong ctr, long oldval, long newval, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index c1c148e..70cb2fc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -82,11 +82,11 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME); - assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, - ZooKeeperProtos.Table.State.ENABLED)); - TEST_UTIL.loadTable(ht, FAMILYNAME, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME)) { + assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, + ZooKeeperProtos.Table.State.ENABLED)); + TEST_UTIL.loadTable(ht, FAMILYNAME, false); + } List> tableRegions = MetaTableAccessor.getTableRegionsAndLocations( m.getZooKeeper(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 6d2b172..179ae9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -101,9 +101,9 @@ public class TestEndToEndSplitTransaction { TableName tableName = TableName.valueOf("TestSplit"); byte[] familyName = Bytes.toBytes("fam"); - HTable ht = TEST_UTIL.createTable(tableName, familyName); - TEST_UTIL.loadTable(ht, familyName, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) { + TEST_UTIL.loadTable(ht, familyName, false); + } HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); byte []firstRow = Bytes.toBytes("aaa"); byte []splitRow = Bytes.toBytes("lll"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index c508e70..d24ba4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -31,7 +31,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FilterFileSystem; @@ -42,11 +41,11 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileContext; @@ -200,23 +199,22 @@ public class TestFSErrorsExposed { util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Make a new Configuration so it makes a new connection that has the // above configuration on it; else we use the old one w/ 10 as default. - HTable table = new HTable(new Configuration(util.getConfiguration()), tableName); - - // Load some data - util.loadTable(table, fam, false); - table.flushCommits(); - util.flush(); - util.countRows(table); + try (Table table = util.getConnection().getTable(tableName)) { + // Load some data + util.loadTable(table, fam, false); + util.flush(); + util.countRows(table); - // Kill the DFS cluster - util.getDFSCluster().shutdownDataNodes(); + // Kill the DFS cluster + util.getDFSCluster().shutdownDataNodes(); - try { - util.countRows(table); - fail("Did not fail to count after removing data"); - } catch (Exception e) { - LOG.info("Got expected error", e); - assertTrue(e.getMessage().contains("Could not seek")); + try { + util.countRows(table); + fail("Did not fail to count after removing data"); + } catch (Exception e) { + LOG.info("Got expected error", e); + assertTrue(e.getMessage().contains("Could not seek")); + } } // Restart data nodes so that HBase can shut down cleanly. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java index 46a4062..08b3b99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java @@ -78,6 +78,7 @@ public class TestRegionFavoredNodes { @AfterClass public static void tearDownAfterClass() throws Exception { + table.close(); if (createWithFavoredNode == null) { return; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index 9ba224a..473946c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -30,6 +30,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; + import static org.junit.Assert.*; import java.io.IOException; @@ -108,10 +109,11 @@ public class TestRegionServerMetrics { TEST_UTIL.createTable(tName, cfName); - new HTable(conf, tName).close(); //wait for the table to come up. + Connection connection = TEST_UTIL.getConnection(); + connection.getTable(tName).close(); //wait for the table to come up. // Do a first put to be sure that the connection is established, meta is there and so on. - HTable table = new HTable(conf, tName); + Table table = connection.getTable(tName); Put p = new Put(row); p.add(cfName, qualifier, initValue); table.put(p); @@ -140,19 +142,21 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 10, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - for ( HRegionInfo i:table.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); - metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + try (RegionLocator locator = connection.getRegionLocator(tName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); + metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + } } - List gets = new ArrayList(); for (int i=0; i< 10; i++) { gets.add(new Get(row)); @@ -169,11 +173,11 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 20, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - table.setAutoFlushTo(false); + List puts = new ArrayList<>(); for (int i=0; i< 30; i++) { - table.put(p); + puts.add(p); } - table.flushCommits(); + table.put(puts); metricsRegionServer.getRegionServerWrapper().forceRecompute(); metricsHelper.assertCounter("totalRequestCount", requests + 80, serverSource); @@ -338,36 +342,39 @@ public class TestRegionServerMetrics { byte[] val = Bytes.toBytes("One"); - TEST_UTIL.createTable(tableName, cf); - HTable t = new HTable(conf, tableName); - t.setAutoFlushTo(false); + List puts = new ArrayList<>(); for (int insertCount =0; insertCount < 100; insertCount++) { Put p = new Put(Bytes.toBytes("" + insertCount + "row")); p.add(cf, qualifier, val); - t.put(p); + puts.add(p); } - t.flushCommits(); - - Scan s = new Scan(); - s.setBatch(1); - s.setCaching(1); - ResultScanner resultScanners = t.getScanner(s); - - for (int nextCount = 0; nextCount < 30; nextCount++) { - Result result = resultScanners.next(); - assertNotNull(result); - assertEquals(1, result.size()); + try (HTable t = TEST_UTIL.createTable(tableName, cf)) { + t.put(puts); + + Scan s = new Scan(); + s.setBatch(1); + s.setCaching(1); + ResultScanner resultScanners = t.getScanner(s); + + for (int nextCount = 0; nextCount < 30; nextCount++) { + Result result = resultScanners.next(); + assertNotNull(result); + assertEquals(1, result.size()); + } } - for ( HRegionInfo i:t.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + } } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index 8e3ca88..63d9c9e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -88,7 +88,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); @@ -169,19 +168,16 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); Put put1 = new Put(Bytes.toBytes("row2")); put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put1); - table.flushCommits(); admin.flush(tableName); put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version1"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); admin.compact(tableName); @@ -218,8 +214,7 @@ public class TestScannerWithBulkload { put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes.toBytes("version0"))); table.put(put1); - table.flushCommits(); - bulkload.doBulkLoad(hfilePath, table); + bulkload.doBulkLoad(hfilePath, (HTable) table); latch.countDown(); } catch (TableNotFoundException e) { } catch (IOException e) { @@ -260,7 +255,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 0890c48..2e18fba 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -310,8 +310,8 @@ public class TestLogRolling { desc.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY)); admin.createTable(desc); - Table table = new HTable(TEST_UTIL.getConfiguration(), desc.getTableName()); - assertTrue(table.isAutoFlush()); + Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); + assertTrue(((HTable) table).isAutoFlush()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); final FSHLog log = (FSHLog) server.getWAL(null); @@ -455,8 +455,6 @@ public class TestLogRolling { writeData(table, 1002); - table.setAutoFlushTo(true); - long curTime = System.currentTimeMillis(); LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index d0caa45..d858321 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -27,7 +27,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -53,7 +52,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(false); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : @@ -118,7 +116,10 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas Put put = new Put(row); put.add(famName, row, row); - htable1 = new HTable(conf1, tableName); + if (htable1 == null) { + htable1 = utility1.getConnection().getTable(tableName); + } + htable1.put(put); Get get = new Get(row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index 7ecdaf7..4163b66 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -35,12 +35,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -69,7 +69,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for ( JVMClusterUtil.RegionServerThread r : diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index 6a39c8a..5010365 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -149,9 +149,8 @@ public class TestReplicationWithTags { Admin admin = conn.getAdmin()) { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } - htable1 = new HTable(conf1, TABLE_NAME); - htable1.setWriteBufferSize(1024); - htable2 = new HTable(conf2, TABLE_NAME); + htable1 = utility1.getConnection().getTable(TABLE_NAME); + htable2 = utility2.getConnection().getTable(TABLE_NAME); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 1b1312a..98a0886 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -393,25 +393,18 @@ public class TestVisibilityLabelsReplication { } static Table writeData(TableName tableName, String... labelExps) throws Exception { - HTable table = null; - try { - table = new HTable(conf, TABLE_NAME_BYTES); - int i = 1; - List puts = new ArrayList(); - for (String labelExp : labelExps) { - Put put = new Put(Bytes.toBytes("row" + i)); - put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); - put.setCellVisibility(new CellVisibility(labelExp)); - put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); - puts.add(put); - i++; - } - table.put(puts); - } finally { - if (table != null) { - table.flushCommits(); - } + Table table = TEST_UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME)); + int i = 1; + List puts = new ArrayList(); + for (String labelExp : labelExps) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); + put.setCellVisibility(new CellVisibility(labelExp)); + put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); + puts.add(put); + i++; } + table.put(puts); return table; } // A simple BaseRegionbserver impl that allows to add a non-visibility tag from the diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java index 0d87dc2..11015a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.ArrayList; import java.util.HashSet; - import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -670,20 +670,23 @@ public class SnapshotTestingUtils { public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows, byte[]... families) throws IOException, InterruptedException { - loadData(util, new HTable(util.getConfiguration(), tableName), rows, families); + try (BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName)) { + loadData(util, mutator, rows, families); + } } - public static void loadData(final HBaseTestingUtility util, final Table table, int rows, + public static void loadData(final HBaseTestingUtility util, final BufferedMutator mutator, int rows, byte[]... families) throws IOException, InterruptedException { - table.setAutoFlushTo(false); - // Ensure one row per region assertTrue(rows >= KEYS.length); for (byte k0: KEYS) { byte[] k = new byte[] { k0 }; byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.mutate(createPut(families1, key1, value1)); rows--; } @@ -691,22 +694,24 @@ public class SnapshotTestingUtils { while (rows-- > 0) { byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.mutate(createPut(families1, key1, value1)); } - table.flushCommits(); + mutator.flush(); - waitForTableToBeOnline(util, table.getName()); + waitForTableToBeOnline(util, mutator.getName()); } - private static void putData(final Table table, final byte[][] families, - final byte[] key, final byte[] value) throws IOException { + private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { byte[] q = Bytes.toBytes("q"); Put put = new Put(key); put.setDurability(Durability.SKIP_WAL); for (byte[] family: families) { put.add(family, q, value); } - table.put(put); + return put; } public static void deleteAllSnapshots(final Admin admin) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java index a1f4605..710d4ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java @@ -37,12 +37,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -70,7 +70,6 @@ public class TestFlushSnapshotFromClient { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final int NUM_RS = 2; private static final byte[] TEST_FAM = Bytes.toBytes("fam"); - private static final byte[] TEST_QUAL = Bytes.toBytes("q"); private static final TableName TABLE_NAME = TableName.valueOf("test"); private final int DEFAULT_NUM_ROWS = 100; @@ -139,8 +138,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), @@ -178,8 +176,9 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - UTIL.loadTable(table, TEST_FAM); + try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) { + UTIL.loadTable(table, TEST_FAM); + } LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), @@ -222,8 +221,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - HTable table = new HTable(UTIL.getConfiguration(), TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java index 6b0f5e4..26de7d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -101,8 +101,8 @@ public class TestRestoreFlushSnapshotFromClient { // create Table and disable it SnapshotTestingUtils.createTable(UTIL, tableName, FAMILY); - HTable table = new HTable(UTIL.getConfiguration(), tableName); - SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY); + SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY); + Table table = UTIL.getConnection().getTable(tableName); snapshot0Rows = UTIL.countRows(table); LOG.info("=== before snapshot with 500 rows"); logFSTree(); @@ -115,7 +115,7 @@ public class TestRestoreFlushSnapshotFromClient { logFSTree(); // insert more data - SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY); + SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY); snapshot1Rows = UTIL.countRows(table); LOG.info("=== before snapshot with 1000 rows"); logFSTree(); -- 1.9.3 (Apple Git-50)