diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java new file mode 100644 index 0000000..17e9cc9 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -0,0 +1,114 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + *

Used to communicate with a single HBase table similar to {@HTable} but meant for + * batched, potentially asynchronous puts. Obtain an instance from a {@link Connection} and call + * {@link #close()} afterwards. + *

+ * + *

Map/Reduce jobs are good use cases for using BulkMutator. Map/reduce jobs benefit from + * batching, but have no natural flush point. BulkMutator receives the puts from the M/R job and + * will batch puts based on some heuristic, such as the accumulated size of the puts, and submit + * batches of puts asynchronously so that the M/R logic can continue without interruption. + *

+ * + *

BulkMutator can also be used on more exotic circumstances. Map/Reduce batch jobs will have a + * single BulkMutator per thread. A single BulkMutator can also be effectively used in high volume + * online systems to batch puts, with the caveat that extreme circumstances, such as JVM or machine + * failure, may cause some data loss.

+ * + *

NOTE: This class replaces the functionality that used to be available in HTable via + * setAutoFlush(false). + *

+ * + * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface BufferedMutator extends Closeable { + /** + * Gets the fully qualified table name instance of the table that this BulkMutator writes to. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will + * affect this instance. + */ + Configuration getConfiguration(); + + /** + * Puts some data in the table. The puts will be buffered and sent over the wire as part of a + * batch. + * + * @param put The data to put. + * @throws IOException if a remote or network exception occurs. + */ + void put(Put put) throws IOException; + + /** + * Puts some data in the table. The puts will be buffered and sent over the wire as part of a + * batch. + * + * @param puts The data to put. + * @throws IOException if a remote or network exception occurs. + */ + void put(List puts) throws IOException; + + /** + * Performs a {@link #flush()} and releases any resources held. + * + * @throws IOException if a remote or network exception occurs. + */ + @Override + void close() throws IOException; + + /** + * Executes all the buffered, asynchronous {@link Put} operations and waits until they are done. + *

+ * This method gets called once automatically for every {@link Put} or batch + * of {@link Put}s (when put(List) is used) when + * @throws IOException if a remote or network exception occurs. + */ + void flush() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter + * {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + long getWriteBufferSize(); +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorConfig.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorConfig.java new file mode 100644 index 0000000..9d0786e --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorConfig.java @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Parameters for instantiating a {@link BufferedMutator}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class BufferedMutatorConfig { + + private int writeBufferSize = -1; + private boolean isMultithreaded = false; + private ExecutorService pool = null; + private BufferedMutatorExceptionListener listener = new BufferedMutatorExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator bufferedMutator) + throws RetriesExhaustedWithDetailsException { + throw exception; + } + }; + + /** + * @return the writeBufferSize + */ + public int getWriteBufferSize() { + return writeBufferSize; + } + + /** + * @param writeBufferSize + * the writeBufferSize to set + */ + public BufferedMutatorConfig withWriteBufferSize(int writeBufferSize) { + this.writeBufferSize = writeBufferSize; + return this; + } + + /** + * Should the + */ + public boolean isMultithreaded() { + return isMultithreaded; + } + + public BufferedMutatorConfig thatIsMultithreaded(boolean isMultithreaded) { + this.isMultithreaded = isMultithreaded; + return this; + } + + /** + * @return the pool + */ + public ExecutorService getPool() { + return pool; + } + + /** + * @param pool + * the pool to set + */ + public BufferedMutatorConfig withPool(ExecutorService pool) { + this.pool = pool; + return this; + } + + /** + * @return the listener + */ + public BufferedMutatorExceptionListener getListener() { + return listener; + } + + /** + * @param listener + * the listener to set + */ + public BufferedMutatorConfig withListener(BufferedMutatorExceptionListener listener) { + this.listener = listener; + return this; + } + +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorExceptionListener.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorExceptionListener.java new file mode 100644 index 0000000..20f4c78 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorExceptionListener.java @@ -0,0 +1,30 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Listens for asynchronous exceptions on a {@link BufferedMutator}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface BufferedMutatorExceptionListener { + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator mutator) + throws RetriesExhaustedWithDetailsException; +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java new file mode 100644 index 0000000..b247940 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -0,0 +1,293 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.util.DoNothingLock; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; + +/** + *

+ * Used to communicate with a single HBase table similar to {@HTable} + * but meant for batched, potentially asynchronous puts. Obtain an instance from + * a {@link Connection} and call {@link #close()} afterwards. + *

+ * + * @see ConnectionFactory + * @see Connection + * @since 1.0.0 + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class BufferedMutatorImpl implements BufferedMutator { + + private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); + + private BufferedMutatorExceptionListener listener; + + protected ClusterConnection connection; + private final TableName tableName; + private volatile Configuration configuration; + private TableConfiguration tableConfiguration; + private List writeAsyncBuffer = new LinkedList(); + private long writeBufferSize; + protected long currentWriteBufferSize = 0; + private boolean closed = false; + private ExecutorService pool; + protected AsyncProcess ap; + private Lock lock; + + public BufferedMutatorImpl(TableName tableName, + ClusterConnection connection, + ExecutorService pool, + RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, + TableConfiguration tableConfig, + BufferedMutatorExceptionListener listener, + long writeBufferSize, + Lock lock) { + if (connection == null || connection.isClosed()) { + throw new IllegalArgumentException("Connection is null or closed."); + } + this.tableName = tableName; + this.connection = connection; + this.configuration = connection.getConfiguration(); + this.pool = pool; + this.tableConfiguration = tableConfig; + this.listener = listener; + this.lock = lock; + + this.writeBufferSize = writeBufferSize; + + // puts need to track errors globally due to how the APIs currently work. + ap = new AsyncProcess(connection, + configuration, + pool, + rpcCallerFactory, + true, + rpcControllerFactory); + } + + @VisibleForTesting + BufferedMutatorImpl(TableConfiguration tableConfiguration) { + this.tableConfiguration = tableConfiguration; + connection = null; + tableName = null; + lock = new DoNothingLock(); + } + + /** + * {@inheritDoc} + */ + @Override + public TableName getName() { + return tableName; + } + + /** + * {@inheritDoc} + */ + @Override + public Configuration getConfiguration() { + return configuration; + } + + /** + * {@inheritDoc} + */ + @Override + public synchronized void put(Put put) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + lock.lock(); + try { + doPut(put); + } finally { + lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void put(List puts) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + lock.lock(); + try { + for (Put put : puts) { + doPut(put); + } + } finally { + lock.unlock(); + } + } + + /** + * Add the put to the buffer. If the buffer is already too large, sends the buffer to the cluster. + * + * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. + * @throws InterruptedIOException if we were interrupted. + */ + private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { + if (closed) { + throw new IllegalStateException("Cannot put when the BulkMutator is closed."); + } + + // This behavior is highly non-intuitive... it does not protect us against + // 94-incompatible behavior, which is a timing issue because hasError, the below code + // and setter of hasError are not synchronized. Perhaps it should be removed. + if (ap.hasError()) { + writeAsyncBuffer.add(put); + backgroundFlushCommits(true); + } + + validatePut(put); + + currentWriteBufferSize += put.heapSize(); + writeAsyncBuffer.add(put); + + while (currentWriteBufferSize > writeBufferSize) { + backgroundFlushCommits(false); + } + } + + // validate for well-formedness + public void validatePut(final Put put) throws IllegalArgumentException { + HTable.validatePut(put, tableConfiguration.getMaxKeyValueSize()); + } + + /** + * {@inheritDoc} + */ + @Override + public void close() throws IOException { + if (this.closed) { + return; + } + this.lock.lock(); + try { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + this.pool.shutdown(); + boolean terminated = false; + do { + // wait until the pool has terminated + terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS); + } while (!terminated); + } catch (InterruptedException e) { + LOG.warn("waitForTermination interrupted"); + } finally { + this.closed = true; + this.lock.unlock(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { + lock.lock(); + try { + // As we can have an operation in progress even if the buffer is empty, we call + // backgroundFlushCommits at least one time. + backgroundFlushCommits(true); + } finally { + lock.unlock(); + } + } + + /** + * Send the operations in the buffer to the servers. Does not wait for the server's answer. If the + * is an error (max retried reach from a previous flush or bad operation), it tries to send all + * operations in the buffer and sends an exception. + * + * @param synchronous - if true, sends all the writes and wait for all of them to finish before + * returning. + */ + private void backgroundFlushCommits(boolean synchronous) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + try { + if (!synchronous) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + if (ap.hasError()) { + LOG.debug(tableName + ": One or more of the operations have failed -" + + " waiting for all operation in progress to finish (successfully or not)"); + } + } + if (synchronous || ap.hasError()) { + while (!writeAsyncBuffer.isEmpty()) { + ap.submit(tableName, writeAsyncBuffer, true, null, false); + } + RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + if (error != null) { + if (listener == null) { + throw error; + } else { + this.listener.onException(error, this); + } + } + } + } finally { + currentWriteBufferSize = 0; + for (Row mut : writeAsyncBuffer) { + if (mut instanceof Mutation) { + currentWriteBufferSize += ((Mutation) mut).heapSize(); + } + } + } + } + + /** + * This is used for legacy purposes in HTable only. This ought not be called for prodcution + * uses. + */ + public void setWriteBufferSize(long writeBufferSize) throws RetriesExhaustedWithDetailsException, + InterruptedIOException { + this.writeBufferSize = writeBufferSize; + if (currentWriteBufferSize > writeBufferSize) { + flush(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public long getWriteBufferSize() { + return this.writeBufferSize; + } + + public List getWriteBuffer() { + return this.writeAsyncBuffer; + } +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/CachingConnection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/CachingConnection.java new file mode 100644 index 0000000..0488d76 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/CachingConnection.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * A connection that caches a map of TableName to Table and TableName to BulkMutator. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class CachingConnection implements Connection { + + private final Connection adapted; + + private ConcurrentHashMap tables = new ConcurrentHashMap<>(); + private ConcurrentHashMap bulkMutators = new ConcurrentHashMap<>(); + + public CachingConnection(Connection conn) { + adapted = conn; + } + + @Override + public void abort(String why, Throwable e) { + adapted.abort(why, e); + } + + @Override + public boolean isAborted() { + return adapted.isAborted(); + } + + @Override + public Configuration getConfiguration() { + return adapted.getConfiguration(); + } + + @Override + public Table getTable(TableName tableName) throws IOException { + Table table = tables.get(tableName); + if (table == null) { + table = adapted.getTable(tableName); + Table existing = tables.putIfAbsent(tableName, table); + if (existing != null) { + table.close(); + table = existing; + } + } + return table; + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + BufferedMutator mutator = bulkMutators.get(tableName); + if (mutator == null) { + mutator = adapted.getBufferedMutator(tableName); + BufferedMutator existing = bulkMutators.putIfAbsent(tableName, mutator); + if (existing != null) { + mutator.close(); + mutator = existing; + } + } + return mutator; + } + + @Override + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { + throw new UnsupportedOperationException("You must use the getTable(TableName) method instead."); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName, BufferedMutatorConfig params) { + throw new UnsupportedOperationException( + "You must use the getBulkMutator(TableName) method instead."); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return adapted.getRegionLocator(tableName); + } + + @Override + public Admin getAdmin() throws IOException { + return adapted.getAdmin(); + } + + @Override + public void close() throws IOException { + for (BufferedMutator bulkMutator : bulkMutators.values()) { + bulkMutator.close(); + } + for (Table table : tables.values()) { + table.close(); + } + adapted.close(); + } + + @Override + public boolean isClosed() { + return adapted.isClosed(); + } +} diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java index 55237be..bd8ff72 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java @@ -99,6 +99,45 @@ public interface Connection extends Abortable, Closeable { Table getTable(TableName tableName, ExecutorService pool) throws IOException; /** + *

+ * Retrieve a {@link BufferedMutator} implementation for accessing bulk + * writing to a table. The {@link BufferedMutator} returned by this method is + * not thread-safe. This BulkMutator will use the Connection's + * ExecutorService. This object can be used for long lived operations. + *

+ *

+ * The caller is responsible for calling {@link BufferedMutator#close()} on + * the returned {@link BufferedMutator} instance. + *

+ *

+ * This accessor will use the connection's ExecutorService and will throw an + * exception in the main thread when an asynchronous exception occurs. + * + * @param tableName + * the name of the table + * + * @return a {@link BufferedMutator} for the supplied tableName. + * @throws IOException + */ + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException; + + /** + * Retrieve a {@link BufferedMutator} implementation for accessing bulk + * writing to a table. The {@link BufferedMutator} is threadsafe based on the + * provided Lock object. This object can be used for long lived
+ * The caller is responsible for calling {@link BufferedMutator#close()} on + * the returned {@link BufferedMutator} instance. + * + * @param tableName + * the name of the table + * @param params + * details on how to instantiation the BulkMutator. + * @return + */ + public BufferedMutator getBufferedMutator(TableName tableName, BufferedMutatorConfig params) + throws IOException; + + /** * Retrieve a RegionLocator implementation to inspect region information on a table. The returned * RegionLocator is not thread-safe, so a new instance should be created for each using thread. * diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java index 53c1271..4793661 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java @@ -110,6 +110,17 @@ abstract class ConnectionAdapter implements ClusterConnection { } @Override + public BufferedMutator getBufferedMutator(TableName tableName, BufferedMutatorConfig params) + throws IOException { + return wrappedConnection.getBufferedMutator(tableName, params); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { + return wrappedConnection.getBufferedMutator(tableName); + } + + @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { return wrappedConnection.getRegionLocator(tableName); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 5db92eb..89895f9 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -39,6 +39,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -69,8 +71,8 @@ import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -169,6 +171,7 @@ import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DoNothingLock; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Threads; @@ -717,6 +720,24 @@ class ConnectionManager { } @Override + public BufferedMutator getBufferedMutator(TableName tableName, BufferedMutatorConfig config) { + ExecutorService pool = config.getPool() == null + ? HTable.getDefaultExecutor(getConfiguration()) + : config.getPool(); + long writeBufferSize = config.getWriteBufferSize() >= 0 + ? config.getWriteBufferSize() + : tableConfig.getWriteBufferSize(); + Lock lock = config.isMultithreaded() ? new ReentrantLock() : new DoNothingLock(); + return new BufferedMutatorImpl(tableName, this, pool, rpcCallerFactory, rpcControllerFactory, + tableConfig, config.getListener(), writeBufferSize, lock); + } + + @Override + public BufferedMutator getBufferedMutator(TableName tableName) { + return getBufferedMutator(tableName, new BufferedMutatorConfig()); + } + + @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { return new HRegionLocator(tableName, this); } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 68d3f9f..3855730 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -70,6 +69,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DoNothingLock; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -112,10 +112,8 @@ public class HTable implements HTableInterface { private final TableName tableName; private volatile Configuration configuration; private TableConfiguration tableConfiguration; - protected List writeAsyncBuffer = new LinkedList(); - private long writeBufferSize; + protected BufferedMutatorImpl mutator; private boolean autoFlush = true; - protected long currentWriteBufferSize = 0 ; private boolean closed = false; protected int scannerCaching; private ExecutorService pool; // For Multi & Scan @@ -125,8 +123,6 @@ public class HTable implements HTableInterface { private Consistency defaultConsistency = Consistency.STRONG; private HRegionLocator locator; - /** The Async process for puts with autoflush set to false or multiputs */ - protected AsyncProcess ap; /** The Async process for batch */ protected AsyncProcess multiAp; private RpcRetryingCallerFactory rpcCallerFactory; @@ -331,6 +327,7 @@ public class HTable implements HTableInterface { tableConfiguration = new TableConfiguration(); cleanupPoolOnClose = false; cleanupConnectionOnClose = false; + this.mutator = new BufferedMutatorImpl(tableConfiguration); } /** @@ -350,7 +347,6 @@ public class HTable implements HTableInterface { this.operationTimeout = tableName.isSystemTable() ? tableConfiguration.getMetaOperationTimeout() : tableConfiguration.getOperationTimeout(); - this.writeBufferSize = tableConfiguration.getWriteBufferSize(); this.scannerCaching = tableConfiguration.getScannerCaching(); if (this.rpcCallerFactory == null) { @@ -361,7 +357,6 @@ public class HTable implements HTableInterface { } // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory); multiAp = this.connection.getAsyncProcess(); this.locator = new HRegionLocator(getName(), connection); } @@ -537,7 +532,7 @@ public class HTable implements HTableInterface { */ @Deprecated public List getWriteBuffer() { - return writeAsyncBuffer; + return mutator == null ? null : mutator.getWriteBuffer(); } /** @@ -640,7 +635,7 @@ public class HTable implements HTableInterface { * This is mainly useful for the MapReduce integration. * @return A map of HRegionInfo with it's server address * @throws IOException if a remote or network exception occurs - * + * * @deprecated Use {@link RegionLocator#getAllRegionLocations()} instead; */ @Deprecated @@ -997,11 +992,11 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} + * @throws IOException */ @Override - public void put(final Put put) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - doPut(put); + public void put(final Put put) throws IOException { + getBulkMutator().put(put); if (autoFlush) { flushCommits(); } @@ -1009,82 +1004,16 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} + * @throws IOException */ @Override - public void put(final List puts) - throws InterruptedIOException, RetriesExhaustedWithDetailsException { - for (Put put : puts) { - doPut(put); - } + public void put(final List puts) throws IOException { + getBulkMutator().put(puts); if (autoFlush) { flushCommits(); } } - - /** - * Add the put to the buffer. If the buffer is already too large, sends the buffer to the - * cluster. - * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster. - * @throws InterruptedIOException if we were interrupted. - */ - private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // This behavior is highly non-intuitive... it does not protect us against - // 94-incompatible behavior, which is a timing issue because hasError, the below code - // and setter of hasError are not synchronized. Perhaps it should be removed. - if (ap.hasError()) { - writeAsyncBuffer.add(put); - backgroundFlushCommits(true); - } - - validatePut(put); - - currentWriteBufferSize += put.heapSize(); - writeAsyncBuffer.add(put); - - while (currentWriteBufferSize > writeBufferSize) { - backgroundFlushCommits(false); - } - } - - - /** - * Send the operations in the buffer to the servers. Does not wait for the server's answer. - * If the is an error (max retried reach from a previous flush or bad operation), it tries to - * send all operations in the buffer and sends an exception. - * @param synchronous - if true, sends all the writes and wait for all of them to finish before - * returning. - */ - private void backgroundFlushCommits(boolean synchronous) throws - InterruptedIOException, RetriesExhaustedWithDetailsException { - - try { - if (!synchronous) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - if (ap.hasError()) { - LOG.debug(tableName + ": One or more of the operations have failed -" + - " waiting for all operation in progress to finish (successfully or not)"); - } - } - if (synchronous || ap.hasError()) { - while (!writeAsyncBuffer.isEmpty()) { - ap.submit(tableName, writeAsyncBuffer, true, null, false); - } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); - if (error != null) { - throw error; - } - } - } finally { - currentWriteBufferSize = 0; - for (Row mut : writeAsyncBuffer) { - if (mut instanceof Mutation) { - currentWriteBufferSize += ((Mutation) mut).heapSize(); - } - } - } - } - /** * {@inheritDoc} */ @@ -1448,12 +1377,11 @@ public class HTable implements HTableInterface { /** * {@inheritDoc} + * @throws IOException */ @Override - public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - // As we can have an operation in progress even if the buffer is empty, we call - // backgroundFlushCommits at least one time. - backgroundFlushCommits(true); + public void flushCommits() throws IOException { + getBulkMutator().flush(); } /** @@ -1552,14 +1480,6 @@ public class HTable implements HTableInterface { * {@inheritDoc} */ @Override - public void setAutoFlushTo(boolean autoFlush) { - this.autoFlush = autoFlush; - } - - /** - * {@inheritDoc} - */ - @Override public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { this.autoFlush = autoFlush; } @@ -1573,7 +1493,7 @@ public class HTable implements HTableInterface { */ @Override public long getWriteBufferSize() { - return writeBufferSize; + return getBulkMutator().getWriteBufferSize(); } /** @@ -1586,10 +1506,8 @@ public class HTable implements HTableInterface { */ @Override public void setWriteBufferSize(long writeBufferSize) throws IOException { - this.writeBufferSize = writeBufferSize; - if(currentWriteBufferSize > writeBufferSize) { - flushCommits(); - } + getBulkMutator(); + mutator.setWriteBufferSize(writeBufferSize); } /** @@ -1907,4 +1825,16 @@ public class HTable implements HTableInterface { public RegionLocator getRegionLocator() { return this.locator; } + + @VisibleForTesting + BufferedMutator getBulkMutator() { + if (mutator == null) { + try { + this.mutator = (BufferedMutatorImpl) connection.getBufferedMutator(tableName); + } catch (IOException e) { + LOG.error("Could not instantiate the buffered mutator.", e); + } + } + return mutator; + } } diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java index 911e034..20f8191 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java @@ -111,6 +111,50 @@ public interface HTableInterface extends Table { @Deprecated void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail); + + /** + * Tells whether or not 'auto-flush' is turned on. + * + * @return {@code true} if 'auto-flush' is enabled (default), meaning + * {@link Put} operations don't get buffered/delayed and are immediately + * executed. + */ + @Deprecated + boolean isAutoFlush(); + + /** + * Executes all the buffered {@link Put} operations. + *

+ * This method gets called once automatically for every {@link Put} or batch + * of {@link Put}s (when put(List) is used) when + * {@link #isAutoFlush} is {@code true}. + * @throws IOException if a remote or network exception occurs. + */ + @Deprecated + void flushCommits() throws IOException; + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter + * {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + @Deprecated + long getWriteBufferSize(); + + /** + * Sets the size of the buffer in bytes. + *

+ * If the new size is less than the current amount of data in the + * write buffer, the buffer gets flushed. + * @param writeBufferSize The new write buffer size, in bytes. + * @throws IOException if a remote or network exception occurs. + */ + @Deprecated + void setWriteBufferSize(long writeBufferSize) throws IOException; + + /** * Return the row that matches row exactly, * or the one that immediately precedes it. diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index a408b1d..55fb1c4 100644 --- hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -219,9 +219,7 @@ public interface Table extends Closeable { /** * Puts some data in the table. - *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. + * * @param put The data to put. * @throws IOException if a remote or network exception occurs. * @since 0.20.0 @@ -231,9 +229,6 @@ public interface Table extends Closeable { /** * Puts some data in the table, in batch. *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. - *

* This can be used for group commit, or for submitting user defined * batches. The writeBuffer will be periodically inspected while the List * is processed, so depending on the List size the writeBuffer may flush @@ -498,30 +493,6 @@ public interface Table extends Closeable { final Batch.Callback callback) throws ServiceException, Throwable; /** - * Tells whether or not 'auto-flush' is turned on. - * - * @return {@code true} if 'auto-flush' is enabled (default), meaning - * {@link Put} operations don't get buffered/delayed and are immediately - * executed. - */ - boolean isAutoFlush(); - - /** - * Executes all the buffered {@link Put} operations. - *

- * This method gets called once automatically for every {@link Put} or batch - * of {@link Put}s (when put(List) is used) when - * {@link #isAutoFlush} is {@code true}. - * @throws IOException if a remote or network exception occurs. - */ - void flushCommits() throws IOException; - - /** - * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail} - */ - void setAutoFlushTo(boolean autoFlush); - - /** * Returns the maximum size in bytes of the write buffer for this HTable. *

* The default value comes from the configuration parameter @@ -540,7 +511,6 @@ public interface Table extends Closeable { */ void setWriteBufferSize(long writeBufferSize) throws IOException; - /** * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all diff --git hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java new file mode 100644 index 0000000..6302f75 --- /dev/null +++ hbase-client/src/main/java/org/apache/hadoop/hbase/util/DoNothingLock.java @@ -0,0 +1,61 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutatorImpl; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +/** + * An implementation of {@link Lock} that doesn't actually lock anything. {@link BufferedMutatorImpl} uses + * a Lock, but there are cases where the mutator will only be used in a single thread. In those + * cases there is no need to incur the the cost of a Lock, so + */ +@InterfaceAudience.Private +@InterfaceStability.Stable +public class DoNothingLock implements Lock { + + @Override + public void lock() { + } + + @Override + public void lockInterruptibly() throws InterruptedException { + } + + @Override + public boolean tryLock() { + return false; + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + return false; + } + + @Override + public void unlock() { + } + + @Override + public Condition newCondition() { + return null; + } +} diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 88a95fb..206bca1 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -155,8 +155,8 @@ public class TestAsyncProcess { new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); } - public MyAsyncProcess( - ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) { + public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, + @SuppressWarnings("unused") boolean dummy) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue(), new CountingThreadFactory(new AtomicInteger())) { @Override @@ -649,7 +649,7 @@ public class TestAsyncProcess { @Test public void testHTablePutSuccess() throws Exception { - HTable ht = Mockito.mock(HTable.class); + BufferedMutatorImpl ht = Mockito.mock(BufferedMutatorImpl.class); ht.ap = new MyAsyncProcess(createHConnection(), conf, true); Put put = createPut(1, true); @@ -662,8 +662,7 @@ public class TestAsyncProcess { private void doHTableFailedPut(boolean bufferOn) throws Exception { HTable ht = new HTable(); MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; - ht.setAutoFlushTo(true); + ht.mutator.ap = ap; if (bufferOn) { ht.setWriteBufferSize(1024L * 1024L); } else { @@ -672,7 +671,7 @@ public class TestAsyncProcess { Put put = createPut(1, false); - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); try { ht.put(put); if (bufferOn) { @@ -681,7 +680,7 @@ public class TestAsyncProcess { Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals(0L, ht.currentWriteBufferSize); + Assert.assertEquals(0L, ht.mutator.currentWriteBufferSize); // The table should have sent one request, maybe after multiple attempts AsyncRequestFuture ars = null; for (AsyncRequestFuture someReqs : ap.allReqs) { @@ -708,14 +707,13 @@ public class TestAsyncProcess { @Test public void testHTableFailedPutAndNewPut() throws Exception { - HTable ht = new HTable(); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(new TableConfiguration()); MyAsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, true); - ht.ap = ap; - ht.setAutoFlushTo(false); - ht.setWriteBufferSize(0); + mutator.ap = ap; + mutator.setWriteBufferSize(0); Put p = createPut(1, false); - ht.put(p); + mutator.put(p); ap.waitUntilDone(); // Let's do all the retries. @@ -725,13 +723,13 @@ public class TestAsyncProcess { // puts, we may raise an exception in the middle of the list. It's then up to the caller to // manage what was inserted, what was tried but failed, and what was not even tried. p = createPut(1, true); - Assert.assertEquals(0, ht.writeAsyncBuffer.size()); + Assert.assertEquals(0, mutator.getWriteBuffer().size()); try { - ht.put(p); + mutator.put(p); Assert.fail(); } catch (RetriesExhaustedException expected) { } - Assert.assertEquals("the put should not been inserted.", 0, ht.writeAsyncBuffer.size()); + Assert.assertEquals("the put should not been inserted.", 0, mutator.getWriteBuffer().size()); } @@ -793,26 +791,25 @@ public class TestAsyncProcess { @Test public void testErrorsServers() throws IOException { - HTable ht = new HTable(); + BufferedMutatorImpl mutator = new BufferedMutatorImpl(new TableConfiguration()); Configuration configuration = new Configuration(conf); configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true); // set default writeBufferSize - ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152)); + mutator.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152)); - ht.connection = new MyConnectionImpl(configuration); - MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true); - ht.ap = ap; + mutator.connection = new MyConnectionImpl(configuration); + MyAsyncProcess ap = new MyAsyncProcess(mutator.connection, configuration, true); + mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); - Assert.assertTrue(ht.ap.serverTrackerTimeout > 200); - ht.ap.serverTrackerTimeout = 1; + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); + Assert.assertTrue(mutator.ap.serverTrackerTimeout > 200); + mutator.ap.serverTrackerTimeout = 1; Put p = createPut(1, false); - ht.setAutoFlushTo(false); - ht.put(p); + mutator.put(p); try { - ht.flushCommits(); + mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { } @@ -822,19 +819,18 @@ public class TestAsyncProcess { @Test public void testGlobalErrors() throws IOException { - HTable ht = new HTable(); - ht.connection = new MyConnectionImpl(conf); - AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf); - ht.ap = ap; + BufferedMutatorImpl mutator = new BufferedMutatorImpl(new TableConfiguration()); + mutator.connection = new MyConnectionImpl(conf); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(mutator.connection, conf); + mutator.ap = ap; - Assert.assertNotNull(ht.ap.createServerErrorTracker()); + Assert.assertNotNull(mutator.ap.createServerErrorTracker()); Put p = createPut(1, true); - ht.setAutoFlushTo(false); - ht.put(p); + mutator.put(p); try { - ht.flushCommits(); + mutator.flush(); Assert.fail(); } catch (RetriesExhaustedWithDetailsException expected) { } @@ -867,7 +863,7 @@ public class TestAsyncProcess { MyAsyncProcess ap = new MyAsyncProcess(con, conf, con.nbThreads); ht.multiAp = ap; - ht.batch(gets); + ht.batch(gets, new Object[gets.size()]); Assert.assertEquals(ap.nbActions.get(), NB_REGS); Assert.assertEquals("1 multi response per server", 2, ap.nbMultiResponse.get()); diff --git hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index 43e0b75..2001213 100644 --- hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DoNothingLock; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.Tool; @@ -711,36 +712,47 @@ public class TestClientNoCluster extends Configured implements Tool { * @throws IOException */ static void cycle(int id, final Configuration c, final Connection sharedConnection) throws IOException { - Table table = sharedConnection.getTable(TableName.valueOf(BIG_USER_TABLE)); - table.setAutoFlushTo(false); long namespaceSpan = c.getLong("hbase.test.namespace.span", 1000000); long startTime = System.currentTimeMillis(); final int printInterval = 100000; Random rd = new Random(id); boolean get = c.getBoolean("hbase.test.do.gets", false); - try { - Stopwatch stopWatch = new Stopwatch(); - stopWatch.start(); - for (int i = 0; i < namespaceSpan; i++) { - byte [] b = format(rd.nextLong()); - if (get){ + TableName tableName = TableName.valueOf(BIG_USER_TABLE); + if (get) { + try (Table table = sharedConnection.getTable(tableName)){ + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(rd.nextLong()); Get g = new Get(b); table.get(g); - } else { + if (i % printInterval == 0) { + LOG.info("Get " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } + } + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); + } + } else { + try (BufferedMutator mutator = sharedConnection.getBufferedMutator(tableName)) { + Stopwatch stopWatch = new Stopwatch(); + stopWatch.start(); + for (int i = 0; i < namespaceSpan; i++) { + byte [] b = format(rd.nextLong()); Put p = new Put(b); p.add(HConstants.CATALOG_FAMILY, b, b); - table.put(p); + mutator.put(p); + if (i % printInterval == 0) { + LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); + stopWatch.reset(); + stopWatch.start(); + } } - if (i % printInterval == 0) { - LOG.info("Put " + printInterval + "/" + stopWatch.elapsedMillis()); - stopWatch.reset(); - stopWatch.start(); + LOG.info("Finished a cycle putting " + namespaceSpan + " in " + + (System.currentTimeMillis() - startTime) + "ms"); } - } - LOG.info("Finished a cycle putting " + namespaceSpan + " in " + - (System.currentTimeMillis() - startTime) + "ms"); - } finally { - table.close(); } } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java index 931fba4..fc1a31b 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedList.java @@ -18,7 +18,18 @@ package org.apache.hadoop.hbase.test; -import com.google.common.collect.Sets; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -40,6 +51,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorConfig; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -87,17 +100,7 @@ import org.apache.hadoop.util.ToolRunner; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.collect.Sets; /** * This is an integration test borrowed from goraci, written by Keith Turner, @@ -340,7 +343,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { byte[] id; long count = 0; int i; - Table table; + BufferedMutator mutator; Connection connection; long numNodes; long wrap; @@ -363,14 +366,13 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } protected void instantiateHTable() throws IOException { - table = connection.getTable(getTableName(connection.getConfiguration())); - table.setAutoFlushTo(false); - table.setWriteBufferSize(4 * 1024 * 1024); + mutator = connection.getBufferedMutator(getTableName(connection.getConfiguration()), + new BufferedMutatorConfig().withWriteBufferSize(4 * 1024 * 1024)); } @Override protected void cleanup(Context context) throws IOException ,InterruptedException { - table.close(); + mutator.close(); connection.close(); } @@ -421,7 +423,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { if (id != null) { put.add(FAMILY_NAME, COLUMN_CLIENT, id); } - table.put(put); + mutator.put(put); if (i % 1000 == 0) { // Tickle progress every so often else maprunner will think us hung @@ -429,7 +431,7 @@ public class IntegrationTestBigLinkedList extends IntegrationTestBase { } } - table.flushCommits(); + mutator.flush(); } } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java index 50c638a..e136ac8 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestBigLinkedListWithVisibility.java @@ -38,13 +38,11 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.chaos.factories.MonkeyFactory; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -187,7 +185,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB protected void instantiateHTable() throws IOException { for (int i = 0; i < DEFAULT_TABLES_COUNT; i++) { Table table = connection.getTable(getTableName(i)); - table.setAutoFlushTo(true); //table.setWriteBufferSize(4 * 1024 * 1024); this.tables[i] = table; } @@ -233,9 +230,6 @@ public class IntegrationTestBigLinkedListWithVisibility extends IntegrationTestB output.progress(); } } - for (int j = 0; j < DEFAULT_TABLES_COUNT; j++) { - tables[j].flushCommits(); - } } } } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java index 6e10ba9..e276853 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestLoadAndVerify.java @@ -42,12 +42,11 @@ import org.apache.hadoop.hbase.IntegrationTestBase; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.BufferedMutatorConfig; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -113,8 +112,6 @@ public class IntegrationTestLoadAndVerify extends IntegrationTestBase { private static final int SCANNER_CACHING = 500; - protected IntegrationTestingUtility util; - private String toRun = null; private enum Counters { @@ -168,7 +165,7 @@ public void cleanUpCluster() throws Exception { { protected long recordsToWrite; protected Connection connection; - protected Table table; + protected BufferedMutator mutator; protected Configuration conf; protected int numBackReferencesPerRow; @@ -184,9 +181,8 @@ public void cleanUpCluster() throws Exception { String tableName = conf.get(TABLE_NAME_KEY, TABLE_NAME_DEFAULT); numBackReferencesPerRow = conf.getInt(NUM_BACKREFS_KEY, NUM_BACKREFS_DEFAULT); this.connection = ConnectionFactory.createConnection(conf); - table = connection.getTable(TableName.valueOf(tableName)); - table.setWriteBufferSize(4*1024*1024); - table.setAutoFlushTo(false); + mutator = connection.getBufferedMutator(TableName.valueOf(tableName), + new BufferedMutatorConfig().withWriteBufferSize(4 * 1024 * 1024)); String taskId = conf.get("mapreduce.task.attempt.id"); Matcher matcher = Pattern.compile(".+_m_(\\d+_\\d+)").matcher(taskId); @@ -201,8 +197,7 @@ public void cleanUpCluster() throws Exception { @Override public void cleanup(Context context) throws IOException { - table.flushCommits(); - table.close(); + mutator.close(); connection.close(); } @@ -235,7 +230,7 @@ public void cleanUpCluster() throws Exception { refsWritten.increment(1); } rowsWritten.increment(1); - table.put(p); + mutator.put(p); if (i % 100 == 0) { context.setStatus("Written " + i + "/" + recordsToWrite + " records"); @@ -244,7 +239,7 @@ public void cleanUpCluster() throws Exception { } // End of block, flush all of them before we start writing anything // pointing to these! - table.flushCommits(); + mutator.flush(); } } } @@ -320,7 +315,7 @@ public void cleanUpCluster() throws Exception { NMapInputFormat.setNumMapTasks(conf, conf.getInt(NUM_MAP_TASKS_KEY, NUM_MAP_TASKS_DEFAULT)); conf.set(TABLE_NAME_KEY, htd.getTableName().getNameAsString()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJobName(TEST_NAME + " Load for " + htd.getTableName()); job.setJarByClass(this.getClass()); setMapperClass(job); @@ -344,7 +339,7 @@ public void cleanUpCluster() throws Exception { protected void doVerify(Configuration conf, HTableDescriptor htd) throws Exception { Path outputDir = getTestDir(TEST_NAME, "verify-output"); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName(TEST_NAME + " Verification for " + htd.getTableName()); setJobScannerConf(job); @@ -398,7 +393,7 @@ public void cleanUpCluster() throws Exception { // Only disable and drop if we succeeded to verify - otherwise it's useful // to leave it around for post-mortem - getTestingUtil(getConf()).deleteTable(htd.getName()); + getTestingUtil(getConf()).deleteTable(htd.getTableName()); } public void usage() { @@ -454,15 +449,17 @@ public void cleanUpCluster() throws Exception { HTableDescriptor htd = new HTableDescriptor(table); htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); - Admin admin = new HBaseAdmin(getConf()); - if (doLoad) { - admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); - doLoad(getConf(), htd); + try (Connection conn = ConnectionFactory.createConnection(getConf()); + Admin admin = conn.getAdmin()) { + if (doLoad) { + admin.createTable(htd, Bytes.toBytes(0L), Bytes.toBytes(-1L), numPresplits); + doLoad(getConf(), htd); + } } if (doVerify) { doVerify(getConf(), htd); if (doDelete) { - getTestingUtil(getConf()).deleteTable(htd.getName()); + getTestingUtil(getConf()).deleteTable(htd.getTableName()); } } return 0; diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java index 96743c8..0880d87 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestWithCellVisibilityLoadAndVerify.java @@ -176,7 +176,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT p.add(TEST_FAMILY, TEST_QUALIFIER, HConstants.EMPTY_BYTE_ARRAY); p.setCellVisibility(new CellVisibility(exp)); getCounter(expIdx).increment(1); - table.put(p); + mutator.put(p); if (i % 100 == 0) { context.setStatus("Written " + i + "/" + recordsToWrite + " records"); @@ -185,7 +185,7 @@ public class IntegrationTestWithCellVisibilityLoadAndVerify extends IntegrationT } // End of block, flush all of them before we start writing anything // pointing to these! - table.flushCommits(); + mutator.flush(); } } diff --git hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java index 1f313c3..c3c3709 100644 --- hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java +++ hbase-it/src/test/java/org/apache/hadoop/hbase/trace/IntegrationTestSendTraceRequests.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -234,12 +234,11 @@ public class IntegrationTestSendTraceRequests extends AbstractHBaseTool { private LinkedBlockingQueue insertData() throws IOException, InterruptedException { LinkedBlockingQueue rowKeys = new LinkedBlockingQueue(25000); - Table ht = util.getConnection().getTable(this.tableName); + BufferedMutator ht = util.getConnection().getBufferedMutator(this.tableName); byte[] value = new byte[300]; for (int x = 0; x < 5000; x++) { TraceScope traceScope = Trace.startSpan("insertData", Sampler.ALWAYS); try { - ht.setAutoFlushTo(false); for (int i = 0; i < 5; i++) { long rk = random.nextLong(); rowKeys.add(rk); diff --git hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java index 5836442..dad5a32 100644 --- hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java +++ hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RowResource.java @@ -227,7 +227,6 @@ public class RowResource extends ResourceBase { } table = servlet.getTable(tableResource.getName()); table.put(puts); - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); @@ -489,7 +488,6 @@ public class RowResource extends ResourceBase { .type(MIMETYPE_TEXT).entity("Value not Modified" + CRLF) .build(); } - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulPutRequests(1); return response.build(); @@ -580,7 +578,6 @@ public class RowResource extends ResourceBase { .type(MIMETYPE_TEXT).entity(" Delete check failed." + CRLF) .build(); } - table.flushCommits(); ResponseBuilder response = Response.ok(); servlet.getMetrics().incrementSucessfulDeleteRequests(1); return response.build(); diff --git hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 65bf509..724d295 100644 --- hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -816,11 +816,6 @@ public class RemoteHTable implements HTableInterface { } @Override - public void setAutoFlushTo(boolean autoFlush) { - throw new UnsupportedOperationException("setAutoFlushTo not implemented"); - } - - @Override public long getWriteBufferSize() { throw new UnsupportedOperationException("getWriteBufferSize not implemented"); } diff --git hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java index b02f069..2d6a9ea 100644 --- hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java +++ hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/PerformanceEvaluation.java @@ -49,15 +49,16 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.Filter; @@ -137,7 +138,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private int presplitRegions = 0; private boolean useTags = false; private int noOfTags = 1; - private HConnection connection; + private Connection connection; private static final Path PERF_EVAL_DIR = new Path("performance_evaluation"); /** @@ -501,7 +502,7 @@ public class PerformanceEvaluation extends Configured implements Tool { value.getRows(), value.getTotalRows(), value.isFlushCommits(), value.isWriteToWAL(), value.isUseTags(), value.getNoOfTags(), - HConnectionManager.createConnection(context.getConfiguration()), status); + ConnectionFactory.createConnection(context.getConfiguration()), status); // Collect how much time the thing took. Report as map output and // to the ELAPSED_TIME counter. context.getCounter(Counter.ELAPSED_TIME).increment(elapsedTime); @@ -609,7 +610,7 @@ public class PerformanceEvaluation extends Configured implements Tool { final int preSplitRegions = this.presplitRegions; final boolean useTags = this.useTags; final int numTags = this.noOfTags; - final HConnection connection = HConnectionManager.createConnection(getConf()); + final Connection connection = ConnectionFactory.createConnection(getConf()); for (int i = 0; i < this.N; i++) { final int index = i; Thread t = new Thread ("TestClient-" + i) { @@ -684,7 +685,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, getClass().getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation"); @@ -790,14 +791,14 @@ public class PerformanceEvaluation extends Configured implements Tool { private boolean writeToWAL = true; private boolean useTags = false; private int noOfTags = 0; - private HConnection connection; + private Connection connection; TestOptions() { } TestOptions(int startRow, int perClientRunRows, int totalRows, int numClientThreads, TableName tableName, boolean flushCommits, boolean writeToWAL, boolean useTags, - int noOfTags, HConnection connection) { + int noOfTags, Connection connection) { this.startRow = startRow; this.perClientRunRows = perClientRunRows; this.totalRows = totalRows; @@ -838,7 +839,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return writeToWAL; } - public HConnection getConnection() { + public Connection getConnection() { return connection; } @@ -870,13 +871,11 @@ public class PerformanceEvaluation extends Configured implements Tool { protected final int totalRows; private final Status status; protected TableName tableName; - protected HTableInterface table; protected volatile Configuration conf; - protected boolean flushCommits; protected boolean writeToWAL; protected boolean useTags; protected int noOfTags; - protected HConnection connection; + protected Connection connection; /** * Note that all subclasses of this class must provide a public contructor @@ -889,9 +888,7 @@ public class PerformanceEvaluation extends Configured implements Tool { this.totalRows = options.getTotalRows(); this.status = status; this.tableName = options.getTableName(); - this.table = null; this.conf = conf; - this.flushCommits = options.isFlushCommits(); this.writeToWAL = options.isWriteToWAL(); this.useTags = options.isUseTags(); this.noOfTags = options.getNumTags(); @@ -907,18 +904,7 @@ public class PerformanceEvaluation extends Configured implements Tool { return period == 0? this.perClientRunRows: period; } - void testSetup() throws IOException { - this.table = connection.getTable(tableName); - this.table.setAutoFlushTo(false); - } - - void testTakedown() throws IOException { - if (flushCommits) { - this.table.flushCommits(); - } - table.close(); - } - + abstract void testTakedown() throws IOException; /* * Run test * @return Elapsed time. @@ -936,6 +922,8 @@ public class PerformanceEvaluation extends Configured implements Tool { return (System.nanoTime() - startTime) / 1000000; } + abstract void testSetup() throws IOException; + /** * Provides an extension point for tests that don't want a per row invocation. */ @@ -957,8 +945,45 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException; } - @SuppressWarnings("unused") - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + public TableTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + } + + void testSetup() throws IOException { + this.table = connection.getTable(tableName); + } + + @Override + void testTakedown() throws IOException { + table.close(); + } + } + + static abstract class BufferedMutatorTest extends Test { + protected BufferedMutator mutator; + protected boolean flushCommits; + + public BufferedMutatorTest(Configuration conf, TestOptions options, Status status) { + super(conf, options, status); + this.flushCommits = options.isFlushCommits(); + } + + void testSetup() throws IOException { + this.mutator = connection.getBufferedMutator(tableName); + } + + void testTakedown() throws IOException { + if (flushCommits) { + this.mutator.flush(); + } + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -981,7 +1006,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } @SuppressWarnings("unused") - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1065,7 +1090,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { RandomReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1085,7 +1110,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BufferedMutatorTest { RandomWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1109,11 +1134,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.put(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Configuration conf, TestOptions options, Status status) { @@ -1141,7 +1166,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); } @@ -1155,7 +1180,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BufferedMutatorTest { SequentialWriteTest(Configuration conf, TestOptions options, Status status) { super(conf, options, status); @@ -1180,11 +1205,11 @@ public class PerformanceEvaluation extends Configured implements Tool { put.add(FAMILY_NAME, QUALIFIER_NAME, value); } put.setDurability(writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.put(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Configuration conf, TestOptions options, Status status) { @@ -1268,7 +1293,7 @@ public class PerformanceEvaluation extends Configured implements Tool { long runOneClient(final Class cmd, final int startRow, final int perClientRunRows, final int totalRows, boolean flushCommits, boolean writeToWAL, boolean useTags, int noOfTags, - HConnection connection, final Status status) + Connection connection, final Status status) throws IOException { status.setStatus("Start " + cmd + " at offset " + startRow + " for " + perClientRunRows + " rows"); @@ -1463,7 +1488,7 @@ public class PerformanceEvaluation extends Configured implements Tool { continue; } - this.connection = HConnectionManager.createConnection(getConf()); + this.connection = ConnectionFactory.createConnection(getConf()); final String useTags = "--usetags="; if (cmd.startsWith(useTags)) { diff --git hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java index eb1fc98..297162b 100644 --- hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java +++ hbase-rest/src/test/java/org/apache/hadoop/hbase/rest/client/TestRemoteTable.java @@ -19,10 +19,10 @@ package org.apache.hadoop.hbase.rest.client; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import java.io.IOException; import java.util.ArrayList; @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -99,9 +98,7 @@ public class TestRemoteTable { htd.addFamily(new HColumnDescriptor(COLUMN_2).setMaxVersions(3)); htd.addFamily(new HColumnDescriptor(COLUMN_3).setMaxVersions(3)); admin.createTable(htd); - Table table = null; - try { - table = TEST_UTIL.getConnection().getTable(TABLE); + try (Table table = TEST_UTIL.getConnection().getTable(TABLE)) { Put put = new Put(ROW_1); put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1); table.put(put); @@ -110,9 +107,6 @@ public class TestRemoteTable { put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2); put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2); table.put(put); - table.flushCommits(); - } finally { - if (null != table) table.close(); } remoteTable = new RemoteHTable( new Client(new Cluster().add("localhost", @@ -349,7 +343,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_2, value2)); Delete delete = new Delete(ROW_3); - delete.deleteColumn(COLUMN_2, QUALIFIER_2); + delete.addColumn(COLUMN_2, QUALIFIER_2); remoteTable.delete(delete); get = new Get(ROW_3); @@ -464,7 +458,7 @@ public class TestRemoteTable { assertTrue(Bytes.equals(VALUE_1, value1)); assertNull(value2); assertTrue(remoteTable.exists(get)); - assertEquals(1, remoteTable.exists(Collections.singletonList(get)).length); + assertEquals(1, remoteTable.existsAll(Collections.singletonList(get)).length); Delete delete = new Delete(ROW_1); remoteTable.checkAndDelete(ROW_1, COLUMN_1, QUALIFIER_1, VALUE_1, delete); diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index eab4a8a..14a7e55 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -55,8 +55,7 @@ import com.google.protobuf.ServiceException; */ public class HTableWrapper implements HTableInterface { - private TableName tableName; - private final Table table; + private final HTableInterface table; private ClusterConnection connection; private final List openTables; @@ -73,7 +72,6 @@ public class HTableWrapper implements HTableInterface { private HTableWrapper(List openTables, TableName tableName, ClusterConnection connection, ExecutorService pool) throws IOException { - this.tableName = tableName; this.table = connection.getTable(tableName, pool); this.connection = connection; this.openTables = openTables; @@ -232,7 +230,7 @@ public class HTableWrapper implements HTableInterface { @Override public byte[] getTableName() { - return tableName.getName(); + return table.getTableName(); } @Override @@ -309,7 +307,7 @@ public class HTableWrapper implements HTableInterface { @Override public void setAutoFlush(boolean autoFlush) { - table.setAutoFlushTo(autoFlush); + table.setAutoFlush(autoFlush); } @Override @@ -327,11 +325,6 @@ public class HTableWrapper implements HTableInterface { } @Override - public void setAutoFlushTo(boolean autoFlush) { - table.setAutoFlushTo(autoFlush); - } - - @Override public long getWriteBufferSize() { return table.getWriteBufferSize(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java index 563b1f8..e8d35cc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapred/TableOutputFormat.java @@ -25,10 +25,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InvalidJobConfException; @@ -52,38 +52,35 @@ public class TableOutputFormat extends FileOutputFormat { - private Table m_table; + private BufferedMutator m_mutator; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the * lifecycle of {@code conn}. */ - public TableRecordWriter(final Table table) throws IOException { - this.m_table = table; + public TableRecordWriter(final BufferedMutator mutator) throws IOException { + this.m_mutator = mutator; } public void close(Reporter reporter) throws IOException { - this.m_table.close(); + this.m_mutator.close(); } public void write(ImmutableBytesWritable key, Put value) throws IOException { - m_table.put(new Put(value)); + m_mutator.put(new Put(value)); } } @Override - public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, - Progressable progress) - throws IOException { + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { // expecting exactly one path TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); - Table table = null; // Connection is not closed. Dies with JVM. No possibility for cleanup. Connection connection = ConnectionFactory.createConnection(job); - table = connection.getTable(tableName); + BufferedMutator mutator = connection.getBufferedMutator(tableName); // Clear write buffer on fail is true by default so no need to reset it. - table.setAutoFlushTo(false); - return new TableRecordWriter(table); + return new TableRecordWriter(mutator); } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java index 0c8e76f..2ab6e8d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/MultiTableOutputFormat.java @@ -19,25 +19,25 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.CachingConnection; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -76,7 +76,6 @@ public class MultiTableOutputFormat extends OutputFormat { private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class); Connection connection; - Map tables; Configuration conf; boolean useWriteAheadLogging; @@ -91,37 +90,44 @@ public class MultiTableOutputFormat extends OutputFormat(); this.conf = conf; this.useWriteAheadLogging = useWriteAheadLogging; } /** - * @param tableName - * the name of the table, as a string + * @param tableName the name of the table, as a string * @return the named table - * @throws IOException - * if there is a problem opening a table + * @throws IOException if there is a problem opening a table + * + * @deprecated use {@link #getTableForDelete(ImmutableBytesWritable)} or + * {@link #getMutator(ImmutableBytesWritable)}. @ */ + @Deprecated Table getTable(ImmutableBytesWritable tableName) throws IOException { - if(this.connection == null){ - this.connection = ConnectionFactory.createConnection(conf); - } - if (!tables.containsKey(tableName)) { - LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); + initConnection(); + Table table = connection.getTable(TableName.valueOf(tableName.get())); + ((HTable) table).setAutoFlush(false); + return table; + } + + Table getTableForDelete(ImmutableBytesWritable tableName) throws IOException { + initConnection(); + return connection.getTable(TableName.valueOf(tableName.get())); + } - Table table = connection.getTable(TableName.valueOf(tableName.get())); - table.setAutoFlushTo(false); - tables.put(tableName, table); + BufferedMutator getMutator(ImmutableBytesWritable tableName) throws IOException { + initConnection(); + return connection.getBufferedMutator(TableName.valueOf(tableName.get())); + } + + protected void initConnection() throws IOException { + if(this.connection == null){ + this.connection = new CachingConnection(ConnectionFactory.createConnection(conf)); } - return tables.get(tableName); } @Override public void close(TaskAttemptContext context) throws IOException { - for (Table table : tables.values()) { - table.flushCommits(); - } if(connection != null){ connection.close(); } @@ -139,16 +145,14 @@ public class MultiTableOutputFormat extends OutputFormatmust be either a {@link Put} or a @@ -78,26 +80,29 @@ implements Configurable { /** The configuration. */ private Configuration conf = null; + private TableName tableName; + /** * Writes the reducer output to an HBase table. */ protected class TableRecordWriter extends RecordWriter { - private Connection connection; - private Table table; + private final Connection connection; + private final Table table; + private final BufferedMutator mutator; /** * @throws IOException * */ public TableRecordWriter() throws IOException { - String tableName = conf.get(OUTPUT_TABLE); - this.connection = ConnectionFactory.createConnection(conf); - this.table = connection.getTable(TableName.valueOf(tableName)); - this.table.setAutoFlushTo(false); + this.connection = new CachingConnection(ConnectionFactory.createConnection(conf)); + this.table = connection.getTable(tableName); + this.mutator = connection.getBufferedMutator(tableName); LOG.info("Created table instance for " + tableName); } + /** * Closes the writer, in this case flush table commits. * @@ -109,6 +114,7 @@ implements Configurable { public void close(TaskAttemptContext context) throws IOException { table.close(); + mutator.close(); connection.close(); } @@ -123,8 +129,8 @@ implements Configurable { @Override public void write(KEY key, Mutation value) throws IOException { - if (value instanceof Put) table.put(new Put((Put)value)); - else if (value instanceof Delete) table.delete(new Delete((Delete)value)); + if (value instanceof Put) mutator.put(new Put((Put) value)); + else if (value instanceof Delete) table.delete(new Delete((Delete) value)); else throw new IOException("Pass a Delete or a Put"); } } @@ -188,6 +194,7 @@ implements Configurable { if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } + this.tableName = TableName.valueOf(tableName); String address = this.conf.get(QUORUM_ADDRESS); int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java index 52424b3..73940f0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -49,13 +49,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -391,6 +390,7 @@ public class PerformanceEvaluation extends Configured implements Tool { throws IOException, InterruptedException { final Class cmd = determineCommandClass(opts.cmdName); assert cmd != null; + @SuppressWarnings("unchecked") Future[] threads = new Future[opts.numClientThreads]; RunResult[] results = new RunResult[opts.numClientThreads]; ExecutorService pool = Executors.newFixedThreadPool(opts.numClientThreads, @@ -456,7 +456,7 @@ public class PerformanceEvaluation extends Configured implements Tool { Path inputDir = writeInputFile(conf, opts); conf.set(EvaluationMapTask.CMD_KEY, cmd.getName()); conf.set(EvaluationMapTask.PE_KEY, PerformanceEvaluation.class.getName()); - Job job = new Job(conf); + Job job = Job.getInstance(conf); job.setJarByClass(PerformanceEvaluation.class); job.setJobName("HBase Performance Evaluation - " + opts.cmdName); @@ -939,7 +939,7 @@ public class PerformanceEvaluation extends Configured implements Tool { private final Sampler traceSampler; private final SpanReceiverHost receiverHost; protected Connection connection; - protected Table table; +// protected Table table; private String testName; private Histogram latency; @@ -1020,25 +1020,25 @@ public class PerformanceEvaluation extends Configured implements Tool { if (!opts.oneCon) { this.connection = ConnectionFactory.createConnection(conf); } - this.table = connection.getTable(TableName.valueOf(opts.tableName)); - this.table.setAutoFlushTo(opts.autoFlush); + onStartup(); latency = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); valueSize = YammerHistogramUtils.newHistogram(new UniformSample(1024 * 500)); } + abstract void onStartup() throws IOException; + void testTakedown() throws IOException { reportLatency(); reportValueSize(); - if (opts.flushCommits) { - this.table.flushCommits(); - } - table.close(); + onTakedown(); if (!opts.oneCon) { connection.close(); } receiverHost.closeReceivers(); } + abstract void onTakedown() throws IOException; + /* * Run test * @return Elapsed time. @@ -1134,7 +1134,43 @@ public class PerformanceEvaluation extends Configured implements Tool { abstract void testRow(final int i) throws IOException, InterruptedException; } - static class RandomSeekScanTest extends Test { + static abstract class TableTest extends Test { + protected Table table; + + TableTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.table = connection.getTable(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + table.close(); + } + } + + static abstract class BulkMutatorTest extends Test { + protected BufferedMutator mutator; + + BulkMutatorTest(Connection con, TestOptions options, Status status) { + super(con, options, status); + } + + @Override + void onStartup() throws IOException { + this.mutator = connection.getBufferedMutator(TableName.valueOf(opts.tableName)); + } + + @Override + void onTakedown() throws IOException { + mutator.close(); + } + } + + static class RandomSeekScanTest extends TableTest { RandomSeekScanTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1164,7 +1200,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static abstract class RandomScanWithRangeTest extends Test { + static abstract class RandomScanWithRangeTest extends TableTest { RandomScanWithRangeTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1252,7 +1288,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomReadTest extends Test { + static class RandomReadTest extends TableTest { private final Consistency consistency; private ArrayList gets; private Random rd = new Random(); @@ -1306,7 +1342,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class RandomWriteTest extends Test { + static class RandomWriteTest extends BulkMutatorTest { RandomWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1332,11 +1368,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.put(put); } } - static class ScanTest extends Test { + static class ScanTest extends TableTest { private ResultScanner testScanner; ScanTest(Connection con, TestOptions options, Status status) { @@ -1369,7 +1405,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } - static class SequentialReadTest extends Test { + static class SequentialReadTest extends TableTest { SequentialReadTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1385,7 +1421,7 @@ public class PerformanceEvaluation extends Configured implements Tool { } } - static class SequentialWriteTest extends Test { + static class SequentialWriteTest extends BulkMutatorTest { SequentialWriteTest(Connection con, TestOptions options, Status status) { super(con, options, status); } @@ -1411,11 +1447,11 @@ public class PerformanceEvaluation extends Configured implements Tool { updateValueSize(value.length); } put.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL); - table.put(put); + mutator.put(put); } } - static class FilteredScanTest extends Test { + static class FilteredScanTest extends TableTest { protected static final Log LOG = LogFactory.getLog(FilteredScanTest.class.getName()); FilteredScanTest(Connection con, TestOptions options, Status status) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java index 711d592..278973e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/TestMultiVersions.java @@ -24,19 +24,17 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.NavigableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestCase.FlushCache; import org.apache.hadoop.hbase.HBaseTestCase.HTableIncommon; import org.apache.hadoop.hbase.HBaseTestCase.Incommon; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -46,7 +44,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -220,14 +217,16 @@ public class TestMultiVersions { } } // Insert data + List puts = new ArrayList<>(); for (int i = 0; i < startKeys.length; i++) { for (int j = 0; j < timestamp.length; j++) { Put put = new Put(rows[i], timestamp[j]); put.add(HConstants.CATALOG_FAMILY, null, timestamp[j], Bytes.toBytes(timestamp[j])); - table.put(put); + puts.add(put); } } + table.put(puts); // There are 5 cases we have to test. Each is described below. for (int i = 0; i < rows.length; i++) { for (int j = 0; j < timestamp.length; j++) { @@ -241,7 +240,6 @@ public class TestMultiVersions { } assertTrue(cellCount == 1); } - table.flushCommits(); } // Case 1: scan with LATEST_TIMESTAMP. Should get two rows diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java index e017bcc..2cb2cfc 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCloneSnapshotFromClient.java @@ -100,31 +100,30 @@ public class TestCloneSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - Table table = TEST_UTIL.getConnection().getTable(tableName); - try { - // enable table and insert data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot0Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); + } + admin.disableTable(tableName); - // take a snapshot - admin.snapshot(snapshotName0, tableName); + // take a snapshot + admin.snapshot(snapshotName0, tableName); - // enable table and insert more data - admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); + // enable table and insert more data + admin.enableTable(tableName); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)){ snapshot1Rows = TEST_UTIL.countRows(table); - admin.disableTable(tableName); + } + admin.disableTable(tableName); - // take a snapshot of the updated table - admin.snapshot(snapshotName1, tableName); + // take a snapshot of the updated table + admin.snapshot(snapshotName1, tableName); - // re-enable table - admin.enableTable(tableName); - } finally { - table.close(); - } + // re-enable table + admin.enableTable(tableName); } protected int getNumReplicas() { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 0ebafaf..dd8215b 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -3903,7 +3903,7 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedOneFlush"), new byte [][] {CONTENTS_FAMILY, SMALL_FAMILY}); - table.setAutoFlushTo(false); + table.setAutoFlush(false); ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { byte[] row = Bytes.toBytes("row" + i); @@ -3934,6 +3934,7 @@ public class TestFromClientSide { Result row : scanner) nbRows++; assertEquals(NB_BATCH_ROWS * 10, nbRows); + table.close(); } @Test @@ -3944,7 +3945,6 @@ public class TestFromClientSide { final int NB_BATCH_ROWS = 10; HTable table = TEST_UTIL.createTable(Bytes.toBytes("testRowsPutBufferedManyManyFlushes"), new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY }); - table.setAutoFlushTo(false); table.setWriteBufferSize(10); ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { @@ -3956,8 +3956,6 @@ public class TestFromClientSide { } table.put(rowsUpdate); - table.flushCommits(); - Scan scan = new Scan(); scan.addFamily(CONTENTS_FAMILY); ResultScanner scanner = table.getScanner(scan); @@ -4146,6 +4144,7 @@ public class TestFromClientSide { HBaseAdmin ha = new HBaseAdmin(t.getConnection()); assertTrue(ha.tableExists(tableName)); assertTrue(t.get(new Get(ROW)).isEmpty()); + ha.close(); } /** @@ -4159,9 +4158,10 @@ public class TestFromClientSide { final TableName tableName = TableName.valueOf("testUnmanagedHConnectionReconnect"); HTable t = createUnmangedHConnectionHTable(tableName); Connection conn = t.getConnection(); - HBaseAdmin ha = new HBaseAdmin(conn); - assertTrue(ha.tableExists(tableName)); - assertTrue(t.get(new Get(ROW)).isEmpty()); + try (HBaseAdmin ha = new HBaseAdmin(conn)) { + assertTrue(ha.tableExists(tableName)); + assertTrue(t.get(new Get(ROW)).isEmpty()); + } // stop the master MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); @@ -4174,9 +4174,10 @@ public class TestFromClientSide { // test that the same unmanaged connection works with a new // HBaseAdmin and can connect to the new master; - HBaseAdmin newAdmin = new HBaseAdmin(conn); - assertTrue(newAdmin.tableExists(tableName)); - assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1); + try (HBaseAdmin newAdmin = new HBaseAdmin(conn)) { + assertTrue(newAdmin.tableExists(tableName)); + assertTrue(newAdmin.getClusterStatus().getServersSize() == SLAVES + 1); + } } @Test @@ -4273,7 +4274,6 @@ public class TestFromClientSide { new byte[][] { HConstants.CATALOG_FAMILY, Bytes.toBytes("info2") }, 1, 1024); // set block size to 64 to making 2 kvs into one block, bypassing the walkForwardInSingleRow // in Store.rowAtOrBeforeFromStoreFile - table.setAutoFlushTo(true); String regionName = table.getRegionLocations().firstKey().getEncodedName(); HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableAname).getFromOnlineRegions(regionName); @@ -4348,6 +4348,8 @@ public class TestFromClientSide { assertTrue(result.containsColumn(HConstants.CATALOG_FAMILY, null)); assertTrue(Bytes.equals(result.getRow(), forthRow)); assertTrue(Bytes.equals(result.getValue(HConstants.CATALOG_FAMILY, null), four)); + + table.close(); } /** diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index bab78ab..abea699 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -149,7 +149,7 @@ public class TestMultiParallel { ThreadPoolExecutor executor = HTable.getDefaultExecutor(UTIL.getConfiguration()); try { try (Table t = connection.getTable(TEST_TABLE, executor)) { - List puts = constructPutRequests(); // creates a Put for every region + List puts = constructPutRequests(); // creates a Put for every region t.batch(puts); HashSet regionservers = new HashSet(); try (RegionLocator locator = connection.getRegionLocator(TEST_TABLE)) { @@ -172,7 +172,7 @@ public class TestMultiParallel { Table table = UTIL.getConnection().getTable(TEST_TABLE); // load test data - List puts = constructPutRequests(); + List puts = constructPutRequests(); table.batch(puts); // create a list of gets and run it @@ -262,16 +262,12 @@ public class TestMultiParallel { // Load the data LOG.info("get new table"); Table table = UTIL.getConnection().getTable(TEST_TABLE); - table.setAutoFlushTo(false); table.setWriteBufferSize(10 * 1024 * 1024); LOG.info("constructPutRequests"); - List puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } + List puts = constructPutRequests(); + table.put(puts); LOG.info("puts"); - table.flushCommits(); final int liveRScount = UTIL.getMiniHBaseCluster().getLiveRegionServerThreads() .size(); assert liveRScount > 0; @@ -290,11 +286,7 @@ public class TestMultiParallel { // try putting more keys after the abort. same key/qual... just validating // no exceptions thrown puts = constructPutRequests(); - for (Row put : puts) { - table.put((Put) put); - } - - table.flushCommits(); + table.put(puts); } LOG.info("validating loaded data"); @@ -332,7 +324,7 @@ public class TestMultiParallel { LOG.info("test=testBatchWithPut"); Table table = CONNECTION.getTable(TEST_TABLE); // put multiple rows using a batch - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -364,7 +356,7 @@ public class TestMultiParallel { Table table = UTIL.getConnection().getTable(TEST_TABLE); // Load some data - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -372,7 +364,7 @@ public class TestMultiParallel { List deletes = new ArrayList(); for (int i = 0; i < KEYS.length; i++) { Delete delete = new Delete(KEYS[i]); - delete.deleteFamily(BYTES_FAMILY); + delete.addFamily(BYTES_FAMILY); deletes.add(delete); } results = table.batch(deletes); @@ -393,7 +385,7 @@ public class TestMultiParallel { Table table = UTIL.getConnection().getTable(TEST_TABLE); // Load some data - List puts = constructPutRequests(); + List puts = constructPutRequests(); Object[] results = table.batch(puts); validateSizeAndEmpty(results, KEYS.length); @@ -665,8 +657,8 @@ public class TestMultiParallel { } } - private List constructPutRequests() { - List puts = new ArrayList(); + private List constructPutRequests() { + List puts = new ArrayList<>(); for (byte[] k : KEYS) { Put put = new Put(k); put.add(BYTES_FAMILY, QUALIFIER, VALUE); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java index a5dce02..c5e6449 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRestoreSnapshotFromClient.java @@ -111,11 +111,12 @@ public class TestRestoreSnapshotFromClient { // take an empty snapshot admin.snapshot(emptySnapshot, tableName); - Table table = TEST_UTIL.getConnection().getTable(tableName); // enable table and insert data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot0Rows = TEST_UTIL.countRows(table); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot0Rows = TEST_UTIL.countRows(table); + } admin.disableTable(tableName); // take a snapshot @@ -123,9 +124,10 @@ public class TestRestoreSnapshotFromClient { // enable table and insert more data admin.enableTable(tableName); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, FAMILY); - snapshot1Rows = TEST_UTIL.countRows(table); - table.close(); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, FAMILY); + try (Table table = TEST_UTIL.getConnection().getTable(tableName)) { + snapshot1Rows = TEST_UTIL.countRows(table); + } } @After @@ -184,7 +186,7 @@ public class TestRestoreSnapshotFromClient { assertEquals(2, table.getTableDescriptor().getFamilies().size()); HTableDescriptor htd = admin.getTableDescriptor(tableName); assertEquals(2, htd.getFamilies().size()); - SnapshotTestingUtils.loadData(TEST_UTIL, table, 500, TEST_FAMILY2); + SnapshotTestingUtils.loadData(TEST_UTIL, tableName, 500, TEST_FAMILY2); long snapshot2Rows = snapshot1Rows + 500; assertEquals(snapshot2Rows, TEST_UTIL.countRows(table)); assertEquals(500, TEST_UTIL.countRows(table, TEST_FAMILY2)); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java index 018bdc4..dcf26f2 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java @@ -133,17 +133,16 @@ public class TestRpcControllerFactory { Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(name); - table.setAutoFlushTo(false); byte[] row = Bytes.toBytes("row"); Put p = new Put(row); p.add(fam1, fam1, Bytes.toBytes("val0")); table.put(p); - table.flushCommits(); + Integer counter = 1; counter = verifyCount(counter); Delete d = new Delete(row); - d.deleteColumn(fam1, fam1); + d.addColumn(fam1, fam1); table.delete(d); counter = verifyCount(counter); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java index 4649961..317707a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestHTableWrapper.java @@ -176,11 +176,11 @@ public class TestHTableWrapper { private void checkAutoFlush() { boolean initialAutoFlush = hTableInterface.isAutoFlush(); - hTableInterface.setAutoFlushTo(false); + hTableInterface.setAutoFlush(false); assertFalse(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(true); + hTableInterface.setAutoFlush(true); assertTrue(hTableInterface.isAutoFlush()); - hTableInterface.setAutoFlushTo(initialAutoFlush); + hTableInterface.setAutoFlush(initialAutoFlush); } private void checkBufferSize() throws IOException { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index f56811e..f237631 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -937,7 +937,6 @@ public class TestDistributedLogSplitting { if (key == null || key.length == 0) { key = new byte[] { 0, 0, 0, 0, 1 }; } - ht.setAutoFlushTo(true); Put put = new Put(key); put.add(Bytes.toBytes("family"), Bytes.toBytes("c1"), new byte[]{'b'}); ht.put(put); @@ -1629,11 +1628,11 @@ public class TestDistributedLogSplitting { /** * Load table with puts and deletes with expected values so that we can verify later */ - private void prepareData(final HTable t, final byte[] f, final byte[] column) throws IOException { - t.setAutoFlushTo(false); + private void prepareData(final Table t, final byte[] f, final byte[] column) throws IOException { byte[] k = new byte[3]; // add puts + List puts = new ArrayList<>(); for (byte b1 = 'a'; b1 <= 'z'; b1++) { for (byte b2 = 'a'; b2 <= 'z'; b2++) { for (byte b3 = 'a'; b3 <= 'z'; b3++) { @@ -1642,11 +1641,11 @@ public class TestDistributedLogSplitting { k[2] = b3; Put put = new Put(k); put.add(f, column, k); - t.put(put); + puts.add(put); } } } - t.flushCommits(); + t.put(puts); // add deletes for (byte b3 = 'a'; b3 <= 'z'; b3++) { k[0] = 'a'; @@ -1655,7 +1654,6 @@ public class TestDistributedLogSplitting { Delete del = new Delete(k); t.delete(del); } - t.flushCommits(); } private void waitForCounter(AtomicLong ctr, long oldval, long newval, diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java index d2f1eab..8028756 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMaster.java @@ -83,11 +83,11 @@ public class TestMaster { MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster(); HMaster m = cluster.getMaster(); - HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME); - assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, - TableState.State.ENABLED)); - TEST_UTIL.loadTable(ht, FAMILYNAME, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME)) { + assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME, + TableState.State.ENABLED)); + TEST_UTIL.loadTable(ht, FAMILYNAME, false); + } List> tableRegions = MetaTableAccessor.getTableRegionsAndLocations( m.getConnection(), TABLENAME); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index 4a6ba41..96b4342 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -99,9 +99,9 @@ public class TestEndToEndSplitTransaction { TableName tableName = TableName.valueOf("TestSplit"); byte[] familyName = Bytes.toBytes("fam"); - HTable ht = TEST_UTIL.createTable(tableName, familyName); - TEST_UTIL.loadTable(ht, familyName, false); - ht.close(); + try (HTable ht = TEST_UTIL.createTable(tableName, familyName)) { + TEST_UTIL.loadTable(ht, familyName, false); + } HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0); byte []firstRow = Bytes.toBytes("aaa"); byte []splitRow = Bytes.toBytes("lll"); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index c9608c9..b1c3692 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -202,23 +202,22 @@ public class TestFSErrorsExposed { util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // Make a new Configuration so it makes a new connection that has the // above configuration on it; else we use the old one w/ 10 as default. - Table table = util.getConnection().getTable(tableName); - - // Load some data - util.loadTable(table, fam, false); - table.flushCommits(); - util.flush(); - util.countRows(table); + try (Table table = util.getConnection().getTable(tableName)) { + // Load some data + util.loadTable(table, fam, false); + util.flush(); + util.countRows(table); - // Kill the DFS cluster - util.getDFSCluster().shutdownDataNodes(); + // Kill the DFS cluster + util.getDFSCluster().shutdownDataNodes(); - try { - util.countRows(table); - fail("Did not fail to count after removing data"); - } catch (Exception e) { - LOG.info("Got expected error", e); - assertTrue(e.getMessage().contains("Could not seek")); + try { + util.countRows(table); + fail("Did not fail to count after removing data"); + } catch (Exception e) { + LOG.info("Got expected error", e); + assertTrue(e.getMessage().contains("Could not seek")); + } } // Restart data nodes so that HBase can shut down cleanly. diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java index 8e0bd21..8e7fe04 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionFavoredNodes.java @@ -78,6 +78,7 @@ public class TestRegionFavoredNodes { @AfterClass public static void tearDownAfterClass() throws Exception { + table.close(); if (createWithFavoredNode == null) { return; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java index b95b20a..aa071ef 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerMetrics.java @@ -31,6 +31,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; + import static org.junit.Assert.*; import java.io.IOException; @@ -109,10 +110,11 @@ public class TestRegionServerMetrics { TEST_UTIL.createTable(tName, cfName); - TEST_UTIL.getConnection().getTable(tName).close(); //wait for the table to come up. + Connection connection = TEST_UTIL.getConnection(); + connection.getTable(tName).close(); //wait for the table to come up. // Do a first put to be sure that the connection is established, meta is there and so on. - HTable table = (HTable) TEST_UTIL.getConnection().getTable(tName); + Table table = connection.getTable(tName); Put p = new Put(row); p.add(cfName, qualifier, initValue); table.put(p); @@ -141,19 +143,21 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 10, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - for ( HRegionInfo i:table.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); - metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + try (RegionLocator locator = connection.getRegionLocator(tName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_getNumOps", 10, agg); + metricsHelper.assertCounter(prefix + "_mutateCount", 31, agg); + } } - List gets = new ArrayList(); for (int i=0; i< 10; i++) { gets.add(new Get(row)); @@ -165,11 +169,11 @@ public class TestRegionServerMetrics { metricsHelper.assertCounter("readRequestCount", readRequests + 20, serverSource); metricsHelper.assertCounter("writeRequestCount", writeRequests + 30, serverSource); - table.setAutoFlushTo(false); + List puts = new ArrayList<>(); for (int i=0; i< 30; i++) { - table.put(p); + puts.add(p); } - table.flushCommits(); + table.put(puts); metricsRegionServer.getRegionServerWrapper().forceRecompute(); metricsHelper.assertCounter("totalRequestCount", requests + 80, serverSource); @@ -325,35 +329,39 @@ public class TestRegionServerMetrics { byte[] val = Bytes.toBytes("One"); - HTable t = TEST_UTIL.createTable(tableName, cf); - t.setAutoFlushTo(false); + List puts = new ArrayList<>(); for (int insertCount =0; insertCount < 100; insertCount++) { Put p = new Put(Bytes.toBytes("" + insertCount + "row")); p.add(cf, qualifier, val); - t.put(p); + puts.add(p); } - t.flushCommits(); - - Scan s = new Scan(); - s.setBatch(1); - s.setCaching(1); - ResultScanner resultScanners = t.getScanner(s); - - for (int nextCount = 0; nextCount < 30; nextCount++) { - Result result = resultScanners.next(); - assertNotNull(result); - assertEquals(1, result.size()); + try (HTable t = TEST_UTIL.createTable(tableName, cf)) { + t.put(puts); + + Scan s = new Scan(); + s.setBatch(1); + s.setCaching(1); + ResultScanner resultScanners = t.getScanner(s); + + for (int nextCount = 0; nextCount < 30; nextCount++) { + Result result = resultScanners.next(); + assertNotNull(result); + assertEquals(1, result.size()); + } } - for ( HRegionInfo i:t.getRegionLocations().keySet()) { - MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) - .getMetrics() - .getSource() - .getAggregateSource(); - String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ - "_table_"+tableNameString + - "_region_" + i.getEncodedName()+ - "_metric"; - metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { + for ( HRegionLocation location: locator.getAllRegionLocations()) { + HRegionInfo i = location.getRegionInfo(); + MetricsRegionAggregateSource agg = rs.getRegion(i.getRegionName()) + .getMetrics() + .getSource() + .getAggregateSource(); + String prefix = "namespace_"+NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR+ + "_table_"+tableNameString + + "_region_" + i.getEncodedName()+ + "_metric"; + metricsHelper.assertCounter(prefix + "_scanNextNumOps", 30, agg); + } } } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java index bd5439a..86515a6 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java @@ -91,7 +91,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); @@ -172,19 +171,16 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); Put put1 = new Put(Bytes.toBytes("row2")); put1.add(new KeyValue(Bytes.toBytes("row2"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version0"))); table.put(put1); - table.flushCommits(); admin.flush(tableName); put0 = new Put(Bytes.toBytes("row1")); put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version1"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); admin.compact(tableName); @@ -221,7 +217,6 @@ public class TestScannerWithBulkload { put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes.toBytes("version0"))); table.put(put1); - table.flushCommits(); bulkload.doBulkLoad(hfilePath, (HTable) table); latch.countDown(); } catch (TableNotFoundException e) { @@ -263,7 +258,6 @@ public class TestScannerWithBulkload { put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes .toBytes("version3"))); table.put(put0); - table.flushCommits(); admin.flush(tableName); scanner = table.getScanner(scan); result = scanner.next(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 1da3662..111acf3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -312,7 +312,7 @@ public class TestLogRolling { admin.createTable(desc); Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - assertTrue(table.isAutoFlush()); + assertTrue(((HTable) table).isAutoFlush()); server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); final FSHLog log = (FSHLog) server.getWAL(null); @@ -456,8 +456,6 @@ public class TestLogRolling { writeData(table, 1002); - table.setAutoFlushTo(true); - long curTime = System.currentTimeMillis(); LOG.info("log.getCurrentFileName()): " + DefaultWALProvider.getCurrentFileName(log)); long oldFilenum = DefaultWALProvider.extractFileNumFromWAL(log); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java index 7ca9fed..4bb1842 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java @@ -28,7 +28,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -54,7 +53,6 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(false); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for (JVMClusterUtil.RegionServerThread r : @@ -119,7 +117,10 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas Put put = new Put(row); put.add(famName, row, row); - htable1 = utility1.getConnection().getTable(tableName); + if (htable1 == null) { + htable1 = utility1.getConnection().getTable(tableName); + } + htable1.put(put); Get get = new Get(row); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index bfb01db..f0db865 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; @@ -70,7 +69,6 @@ public class TestReplicationSmallTests extends TestReplicationBase { */ @Before public void setUp() throws Exception { - htable1.setAutoFlushTo(true); // Starting and stopping replication can make us miss new logs, // rolling like this makes sure the most recent one gets added to the queue for ( JVMClusterUtil.RegionServerThread r : diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java index adf3c0e..f1e956c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWithTags.java @@ -151,7 +151,6 @@ public class TestReplicationWithTags { admin.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); } htable1 = utility1.getConnection().getTable(TABLE_NAME); - htable1.setWriteBufferSize(1024); htable2 = utility2.getConnection().getTable(TABLE_NAME); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java index 9ea64d1..c087f4e 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/security/visibility/TestVisibilityLabelsReplication.java @@ -399,25 +399,18 @@ public class TestVisibilityLabelsReplication { } static Table writeData(TableName tableName, String... labelExps) throws Exception { - Table table = null; - try { - table = TEST_UTIL.getConnection().getTable(TABLE_NAME); - int i = 1; - List puts = new ArrayList(); - for (String labelExp : labelExps) { - Put put = new Put(Bytes.toBytes("row" + i)); - put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); - put.setCellVisibility(new CellVisibility(labelExp)); - put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); - puts.add(put); - i++; - } - table.put(puts); - } finally { - if (table != null) { - table.flushCommits(); - } + Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME); + int i = 1; + List puts = new ArrayList(); + for (String labelExp : labelExps) { + Put put = new Put(Bytes.toBytes("row" + i)); + put.add(fam, qual, HConstants.LATEST_TIMESTAMP, value); + put.setCellVisibility(new CellVisibility(labelExp)); + put.setAttribute(NON_VISIBILITY, Bytes.toBytes(TEMP)); + puts.add(put); + i++; } + table.put(puts); return table; } // A simple BaseRegionbserver impl that allows to add a non-visibility tag from the diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java index 74f358d..23a1ab0 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/SnapshotTestingUtils.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.ArrayList; import java.util.HashSet; - import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +46,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; @@ -673,20 +673,22 @@ public class SnapshotTestingUtils { public static void loadData(final HBaseTestingUtility util, final TableName tableName, int rows, byte[]... families) throws IOException, InterruptedException { - loadData(util, util.getConnection().getTable(tableName), rows, families); + BufferedMutator mutator = util.getConnection().getBufferedMutator(tableName); + loadData(util, mutator, rows, families); } - public static void loadData(final HBaseTestingUtility util, final Table table, int rows, + public static void loadData(final HBaseTestingUtility util, final BufferedMutator mutator, int rows, byte[]... families) throws IOException, InterruptedException { - table.setAutoFlushTo(false); - // Ensure one row per region assertTrue(rows >= KEYS.length); for (byte k0: KEYS) { byte[] k = new byte[] { k0 }; byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.put(createPut(families1, key1, value1)); rows--; } @@ -694,22 +696,24 @@ public class SnapshotTestingUtils { while (rows-- > 0) { byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); - putData(table, families, key, value); + final byte[][] families1 = families; + final byte[] key1 = key; + final byte[] value1 = value; + mutator.put(createPut(families1, key1, value1)); } - table.flushCommits(); + mutator.flush(); - waitForTableToBeOnline(util, table.getName()); + waitForTableToBeOnline(util, mutator.getName()); } - private static void putData(final Table table, final byte[][] families, - final byte[] key, final byte[] value) throws IOException { + private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { byte[] q = Bytes.toBytes("q"); Put put = new Put(key); put.setDurability(Durability.SKIP_WAL); for (byte[] family: families) { put.add(family, q, value); } - table.put(put); + return put; } public static void deleteAllSnapshots(final Admin admin) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java index 3893871..b45d676 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestFlushSnapshotFromClient.java @@ -31,7 +31,6 @@ import java.util.concurrent.CountDownLatch; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -41,11 +40,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.AbstractRpcClient; -import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; @@ -54,7 +49,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -76,7 +70,6 @@ public class TestFlushSnapshotFromClient { private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final int NUM_RS = 2; private static final byte[] TEST_FAM = Bytes.toBytes("fam"); - private static final byte[] TEST_QUAL = Bytes.toBytes("q"); private static final TableName TABLE_NAME = TableName.valueOf("test"); private final int DEFAULT_NUM_ROWS = 100; @@ -145,8 +138,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - Table table = UTIL.getConnection().getTable(TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), @@ -228,8 +220,7 @@ public class TestFlushSnapshotFromClient { SnapshotTestingUtils.assertNoSnapshots(admin); // put some stuff in the table - Table table = UTIL.getConnection().getTable(TABLE_NAME); - SnapshotTestingUtils.loadData(UTIL, table, DEFAULT_NUM_ROWS, TEST_FAM); + SnapshotTestingUtils.loadData(UTIL, TABLE_NAME, DEFAULT_NUM_ROWS, TEST_FAM); LOG.debug("FS state before snapshot:"); FSUtils.logFileSystemState(UTIL.getTestFileSystem(), diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java index 4b36c11..ae1ca13 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreFlushSnapshotFromClient.java @@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; @@ -103,8 +102,8 @@ public class TestRestoreFlushSnapshotFromClient { // create Table and disable it SnapshotTestingUtils.createTable(UTIL, tableName, FAMILY); + SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY); Table table = UTIL.getConnection().getTable(tableName); - SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY); snapshot0Rows = UTIL.countRows(table); LOG.info("=== before snapshot with 500 rows"); logFSTree(); @@ -117,7 +116,7 @@ public class TestRestoreFlushSnapshotFromClient { logFSTree(); // insert more data - SnapshotTestingUtils.loadData(UTIL, table, 500, FAMILY); + SnapshotTestingUtils.loadData(UTIL, tableName, 500, FAMILY); snapshot1Rows = UTIL.countRows(table); LOG.info("=== before snapshot with 1000 rows"); logFSTree(); diff --git hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java index e9c9e1f..558ee15 100644 --- hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java +++ hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/HTablePool.java @@ -635,11 +635,6 @@ public class HTablePool implements Closeable { } @Override - public void setAutoFlushTo(boolean autoFlush) { - table.setAutoFlushTo(autoFlush); - } - - @Override public long getWriteBufferSize() { checkState(); return table.getWriteBufferSize();