From 5f1f97bc9524d62bd7d8d30c5825cfacda587c66 Mon Sep 17 00:00:00 2001 From: Nick Dimiduk Date: Wed, 7 Jan 2015 12:23:02 -0800 Subject: [PATCH] HBASE-12728 [WIP - do not push] experiment with buffers in connection --- .../hadoop/hbase/client/BufferedConnection.java | 46 ++++ .../hbase/client/BufferedConnectionImpl.java | 296 +++++++++++++++++++++ .../apache/hadoop/hbase/client/BufferedTable.java | 52 ++++ .../hadoop/hbase/client/ConnectionFactory.java | 28 +- .../org/apache/hadoop/hbase/client/HTable.java | 20 +- .../hadoop/hbase/client/HTableInterface.java | 44 ++- .../org/apache/hadoop/hbase/client/HTablePool.java | 5 + .../java/org/apache/hadoop/hbase/client/Table.java | 50 ---- .../hbase/client/example/BufferedTableExample.java | 112 ++++++++ 9 files changed, 594 insertions(+), 59 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnection.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnectionImpl.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedTable.java create mode 100644 hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedTableExample.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnection.java new file mode 100644 index 0000000..2a43875 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnection.java @@ -0,0 +1,46 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedTable.ExceptionListener; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface BufferedConnection extends Connection { + + BufferedTable getBufferedTable(TableName tableName) throws IOException; + + BufferedTable getBufferedTable(TableName tableName, ExecutorService pool) throws IOException; + + BufferedTable getBufferedTable(TableName tableName, ExceptionListener listener) + throws IOException; + + BufferedTable getBufferedTable(TableName tableName, ExecutorService pool, + ExceptionListener listener) throws IOException; + + /* TODO: do we also want ...? + void flush() throws IOException; + */ +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnectionImpl.java new file mode 100644 index 0000000..e203344 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedConnectionImpl.java @@ -0,0 +1,296 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.BufferedTable.ExceptionListener; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class BufferedConnectionImpl implements BufferedConnection { + + private static final Log LOG = LogFactory.getLog(BufferedTableImpl.class); + + /** + * Thread-safe handler of puts for one or more BufferedTable instances. + */ + private class AsyncMutator { + private final AsyncProcess ap; + private final ExecutorService pool; + private final List buffer; + private final long maxBufferSize; + private long currentBufferSize = 0; + + AsyncMutator(ExecutorService pool, AsyncProcess ap, long maxBufferSize) { + this.pool = pool; + this.ap = ap; + this.maxBufferSize = maxBufferSize; + this.buffer = new ArrayList<>(100); + } + + public ExecutorService getPool() { + return pool; + } + + public boolean hasError() { + return ap.hasError(); + } + + public synchronized void add(Mutation m) { + currentBufferSize += m.heapSize(); + buffer.add(m); + } + + public synchronized boolean needsFlush() { + return currentBufferSize > maxBufferSize; + } + + public synchronized AsyncProcess.AsyncRequestFuture submit(TableName tableName, + boolean atLeastOne, boolean needsResults) throws InterruptedIOException { + return ap.submit(tableName, buffer, atLeastOne, null, needsResults); + } + + public synchronized boolean isBufferEmpty() { + return buffer.isEmpty(); + } + + public synchronized RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset() + throws InterruptedIOException { + return ap.waitForAllPreviousOpsAndReset(null); + } + + public synchronized void resetAfterSubmit() { + currentBufferSize = 0; + for (Row mut : buffer) { + if (mut instanceof Mutation) { + currentBufferSize += ((Mutation) mut).heapSize(); + } + } + } + } + + class BufferedTableImpl extends HTable { + + private final ExceptionListener errorBack; + + BufferedTableImpl(TableName tableName, ClusterConnection connection, + TableConfiguration tableConfig, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, ExceptionListener l) + throws IOException { + super(tableName, connection, tableConfig, rpcCallerFactory, rpcControllerFactory, pool); + this.errorBack = l; + } + + @Override + protected void finishSetup() throws IOException { + super.finishSetup(); + assert !cleanupPoolOnClose; // pool is managed by connection + assert !cleanupConnectionOnClose; // connection is managed by user; + } + + @Override + protected void doPut(Put put) throws InterruptedIOException, + RetriesExhaustedWithDetailsException { + + // + // method body copied from HTable#backgroundFlushCommits and adapted to use ExternalAP. + // + + AsyncMutator eap = aps.get(tableName); + + if (eap.hasError()) { + eap.add(put); + backgroundFlushCommits(true); + } + + validatePut(put); + eap.add(put); + + while (eap.needsFlush()) { + backgroundFlushCommits(false); + } + } + + @Override + protected void backgroundFlushCommits(boolean synchronous) throws + InterruptedIOException, RetriesExhaustedWithDetailsException { + + // + // method body copied from HTable#backgroundFlushCommits and adapted to use ExternalAP. + // + + AsyncMutator eap = aps.get(tableName); + + try { + if (!synchronous) { + eap.submit(tableName, true, false); + if (eap.hasError()) { + LOG.debug(tableName + ": One or more of the operations have failed -" + + " waiting for all operation in progress to finish (successfully or not)"); + } + } + if (synchronous || eap.hasError()) { + while (!eap.isBufferEmpty()) { + eap.submit(tableName, true, false); + } + RetriesExhaustedWithDetailsException error = eap.waitForAllPreviousOpsAndReset(); + if (error != null && errorBack != null) { + for (int i = 0; i < error.exceptions.size(); i++) { + Mutation m = (Mutation) ((error.getRow(i) instanceof Mutation) ? error.getRow(i) : null); + errorBack.onException(error.getCause(i), m); + } + } + } + } finally { + eap.resetAfterSubmit(); + } + } + + @Override + public void close() throws IOException { + if (this.closed) { + return; + } + // NOT calling flushCommits() here + this.closed = true; + // TODO: use reference counting to clean up BufferedConnectionImpl#aps if this was the last table + } + } + + private final Connection delegate; + private final Map aps; + + public BufferedConnectionImpl(Connection conn) { + delegate = conn; + aps = new HashMap<>(); + } + + @Override + public BufferedTable getBufferedTable(TableName tableName) throws IOException { + return getBufferedTable(tableName, (ExceptionListener) null); + } + + @Override + public BufferedTable getBufferedTable(TableName tableName, ExecutorService pool) + throws IOException { + return getBufferedTable(tableName, pool, null); + } + + @Override + public BufferedTable getBufferedTable(TableName tableName, ExceptionListener listener) + throws IOException { + Configuration conf = getConfiguration(); + ConnectionManager.HConnectionImplementation hci = + (ConnectionManager.HConnectionImplementation) delegate; + ExecutorService pool; + if (!aps.containsKey(tableName)) { + pool = HTable.getDefaultExecutor(conf); + AsyncProcess ap = new AsyncProcess(hci, getConfiguration(), pool, + hci.getNewRpcRetryingCallerFactory(conf), false, RpcControllerFactory.instantiate(conf)); + aps.put(tableName, new AsyncMutator(pool, ap, conf.getLong("hbase.client.write.buffer", 2097152))); + } else { + pool = aps.get(tableName).getPool(); + } + return new BufferedTableImpl(tableName, hci, new TableConfiguration(conf), + RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf), pool, + listener); + } + + @Override + public BufferedTable getBufferedTable(TableName tableName, ExecutorService pool, + ExceptionListener listener) throws IOException { + Configuration conf = getConfiguration(); + ConnectionManager.HConnectionImplementation hci = + (ConnectionManager.HConnectionImplementation) delegate; + return new BufferedTableImpl(tableName, hci, new TableConfiguration(conf), + RpcRetryingCallerFactory.instantiate(conf), RpcControllerFactory.instantiate(conf), pool, + listener); + } + + @Override + public void close() throws IOException { + assert aps.isEmpty() : "Leaking resources"; + /* TODO: instead of assert, something like + for (TableName t : aps.keySet()) { + try (BufferedTable table = getBufferedTable(t)) { + table.flush(); + } + } + */ + delegate.close(); + } + + // + // everything below this point is just delegate deferral + // + + @Override + public Configuration getConfiguration() { + return delegate.getConfiguration(); + } + + @Override + public Table getTable(TableName tableName) throws IOException { + return delegate.getTable(tableName); + } + + @Override + public Table getTable(TableName tableName, ExecutorService pool) throws IOException { + return delegate.getTable(tableName, pool); + } + + @Override + public RegionLocator getRegionLocator(TableName tableName) throws IOException { + return delegate.getRegionLocator(tableName); + } + + @Override + public Admin getAdmin() throws IOException { + return delegate.getAdmin(); + } + + @Override + public boolean isClosed() { + return delegate.isClosed(); + } + + @Override + public void abort(String why, Throwable e) { + delegate.abort(why, e); + } + + @Override + public boolean isAborted() { + return delegate.isAborted(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedTable.java new file mode 100644 index 0000000..16df757 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedTable.java @@ -0,0 +1,52 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import java.io.IOException; + +/** + * An interface for interacting with a {@link Table} for which write buffering is enabled. + * + * @see HTable#setAutoFlushTo(boolean) + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface BufferedTable extends Table { + + /** + * Executes all the buffered {@link Put} operations. + */ + void flush() throws IOException; + + /** + * An exception handler callback for client applications + */ + @InterfaceStability.Evolving + public interface ExceptionListener { + + /** + * Called by {@link org.apache.hadoop.hbase.client.BufferedConnection} when an exception occurs + * while flushing the write buffer. + */ + void onException(Throwable e, Mutation m); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index 3969d2c..e9afe98 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.security.UserProvider; * Managing the lifecycle of the {@link Connection}s to the cluster is the responsibility of * the caller. * From a {@link Connection}, {@link Table} implementations are retrieved - * with {@link Connection#getTable(TableName)}. Example: + * with {@link Connection#getTable(org.apache.hadoop.hbase.TableName)}. Example: *
  * Connection connection = ConnectionFactory.createConnection(config);
  * Table table = connection.getTable(TableName.valueOf("table1"));
@@ -218,6 +218,32 @@ public class ConnectionFactory {
     return createConnection(conf, false, pool, user);
   }
 
+  /**
+   * Create a new BufferedConnection over an existing {@link Connection} instance.
+   */
+  public static BufferedConnection createBufferedConnection(Connection connection) {
+    return new BufferedConnectionImpl(connection);
+  }
+
+  public static BufferedConnection createBufferedConnection(Configuration conf) throws IOException {
+    return createBufferedConnection(createConnection(conf));
+  }
+
+  public static BufferedConnection createBufferedConnection(Configuration conf,
+      ExecutorService pool) throws IOException {
+    return createBufferedConnection(createConnection(conf, pool));
+  }
+
+  public static BufferedConnection createBufferedConnection(Configuration conf,
+      ExecutorService pool, User user) throws IOException {
+    return createBufferedConnection(createConnection(conf, pool, user));
+  }
+
+  public static BufferedConnection createBufferedConnection(Configuration conf, User user)
+      throws IOException {
+    return createBufferedConnection(createConnection(conf, user));
+  }
+
   static Connection createConnection(final Configuration conf, final boolean managed,
       final ExecutorService pool, final User user)
   throws IOException {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index c141b29..5e0e3f1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -113,19 +113,19 @@ import com.google.protobuf.ServiceException;
 public class HTable implements HTableInterface, RegionLocator {
   private static final Log LOG = LogFactory.getLog(HTable.class);
   protected ClusterConnection connection;
-  private final TableName tableName;
+  protected final TableName tableName;
   private volatile Configuration configuration;
   private TableConfiguration tableConfiguration;
   protected List writeAsyncBuffer = new LinkedList();
   private long writeBufferSize;
   private boolean autoFlush = true;
   protected long currentWriteBufferSize = 0 ;
-  private boolean closed = false;
+  protected boolean closed = false;
   protected int scannerCaching;
   private ExecutorService pool;  // For Multi & Scan
   private int operationTimeout;
-  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
-  private final boolean cleanupConnectionOnClose; // close the connection in close()
+  protected final boolean cleanupPoolOnClose; // shutdown the pool in close()
+  protected final boolean cleanupConnectionOnClose; // close the connection in close()
   private Consistency defaultConsistency = Consistency.STRONG;
 
   /** The Async process for puts with autoflush set to false or multiputs */
@@ -345,7 +345,7 @@ public class HTable implements HTableInterface, RegionLocator {
   /**
    * setup this HTable's parameter based on the passed configuration
    */
-  private void finishSetup() throws IOException {
+  protected void finishSetup() throws IOException {
     if (tableConfiguration == null) {
       tableConfiguration = new TableConfiguration(configuration);
     }
@@ -1054,7 +1054,8 @@ public class HTable implements HTableInterface, RegionLocator {
    * @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
    * @throws InterruptedIOException if we were interrupted.
    */
-  private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+  protected void doPut(Put put) throws InterruptedIOException,
+      RetriesExhaustedWithDetailsException {
     // This behavior is highly non-intuitive... it does not protect us against
     // 94-incompatible behavior, which is a timing issue because hasError, the below code
     // and setter of hasError are not synchronized. Perhaps it should be removed.
@@ -1081,7 +1082,7 @@ public class HTable implements HTableInterface, RegionLocator {
    * @param synchronous - if true, sends all the writes and wait for all of them to finish before
    *                     returning.
    */
-  private void backgroundFlushCommits(boolean synchronous) throws
+  protected void backgroundFlushCommits(boolean synchronous) throws
       InterruptedIOException, RetriesExhaustedWithDetailsException {
 
     try {
@@ -1472,6 +1473,11 @@ public class HTable implements HTableInterface, RegionLocator {
     return objectResults;
   }
 
+  @Override
+  public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
+    flushCommits();
+  }
+
   /**
    * {@inheritDoc}
    */
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
index 911e034..37f9662 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
 @Deprecated
 @InterfaceAudience.Private
 @InterfaceStability.Stable
-public interface HTableInterface extends Table {
+public interface HTableInterface extends BufferedTable {
 
   /**
    * Gets the name of this table.
@@ -59,6 +59,48 @@ public interface HTableInterface extends Table {
   @Deprecated
   Boolean[] exists(List gets) throws IOException;
 
+  /**
+   * Tells whether or not 'auto-flush' is turned on.
+   *
+   * @return {@code true} if 'auto-flush' is enabled (default), meaning
+   * {@link Put} operations don't get buffered/delayed and are immediately
+   * executed.
+   */
+  boolean isAutoFlush();
+
+  /**
+   * Executes all the buffered {@link Put} operations.
+   * 

+ * This method gets called once automatically for every {@link Put} or batch + * of {@link Put}s (when put(List) is used) when + * {@link #isAutoFlush} is {@code true}. + * @throws IOException if a remote or network exception occurs. + */ + void flushCommits() throws IOException; + + /** + * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail} + */ + void setAutoFlushTo(boolean autoFlush); + + /** + * Returns the maximum size in bytes of the write buffer for this HTable. + *

+ * The default value comes from the configuration parameter + * {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + long getWriteBufferSize(); + + /** + * Sets the size of the buffer in bytes. + *

+ * If the new size is less than the current amount of data in the + * write buffer, the buffer gets flushed. + * @param writeBufferSize The new write buffer size, in bytes. + * @throws IOException if a remote or network exception occurs. + */ + void setWriteBufferSize(long writeBufferSize) throws IOException; /** * See {@link #setAutoFlush(boolean, boolean)} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 4b998a6..b49f442 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -517,6 +517,11 @@ public class HTablePool implements Closeable { } @Override + public void flush() throws IOException { + flushCommits(); + } + + @Override public void flushCommits() throws IOException { checkState(); table.flushCommits(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 07e4c08..4971b18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -219,9 +219,6 @@ public interface Table extends Closeable { /** * Puts some data in the table. - *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. * @param put The data to put. * @throws IOException if a remote or network exception occurs. * @since 0.20.0 @@ -231,9 +228,6 @@ public interface Table extends Closeable { /** * Puts some data in the table, in batch. *

- * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered - * until the internal buffer is full. - *

* This can be used for group commit, or for submitting user defined * batches. The writeBuffer will be periodically inspected while the List * is processed, so depending on the List size the writeBuffer may flush @@ -498,50 +492,6 @@ public interface Table extends Closeable { final Batch.Callback callback) throws ServiceException, Throwable; /** - * Tells whether or not 'auto-flush' is turned on. - * - * @return {@code true} if 'auto-flush' is enabled (default), meaning - * {@link Put} operations don't get buffered/delayed and are immediately - * executed. - */ - boolean isAutoFlush(); - - /** - * Executes all the buffered {@link Put} operations. - *

- * This method gets called once automatically for every {@link Put} or batch - * of {@link Put}s (when put(List) is used) when - * {@link #isAutoFlush} is {@code true}. - * @throws IOException if a remote or network exception occurs. - */ - void flushCommits() throws IOException; - - /** - * Set the autoFlush behavior, without changing the value of {@code clearBufferOnFail} - */ - void setAutoFlushTo(boolean autoFlush); - - /** - * Returns the maximum size in bytes of the write buffer for this HTable. - *

- * The default value comes from the configuration parameter - * {@code hbase.client.write.buffer}. - * @return The size of the write buffer in bytes. - */ - long getWriteBufferSize(); - - /** - * Sets the size of the buffer in bytes. - *

- * If the new size is less than the current amount of data in the - * write buffer, the buffer gets flushed. - * @param writeBufferSize The new write buffer size, in bytes. - * @throws IOException if a remote or network exception occurs. - */ - void setWriteBufferSize(long writeBufferSize) throws IOException; - - - /** * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive), all * the invocations to the same region server will be batched into one call. The coprocessor diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedTableExample.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedTableExample.java new file mode 100644 index 0000000..8d8334d --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/BufferedTableExample.java @@ -0,0 +1,112 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.example; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.BufferedConnection; +import org.apache.hadoop.hbase.client.BufferedTable; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class BufferedTableExample extends Configured implements Tool { + + private static final int POOL_SIZE = 10; + private static final int TASK_COUNT = 100; + private static final int ROWS_PER_TASK = 10000; + private static final TableName TABLE = TableName.valueOf("foo"); + private static final byte[] FAMILY = Bytes.toBytes("f"); + + @Override + public int run(String[] args) throws InterruptedException, ExecutionException, TimeoutException { + // one shared BufferedConnection instance, shared by all worker threads. + try (final BufferedConnection conn = ConnectionFactory.createBufferedConnection(getConf())) { + ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); + List> futures = new ArrayList<>(TASK_COUNT); + + for (int i = 0; i < TASK_COUNT; i++) { + futures.add(pool.submit(new Callable() { + private Random rand = new Random(System.currentTimeMillis() % 17); + + @Override + public Void call() throws Exception { + + final BlockingQueue workQueue = new ArrayBlockingQueue<>(TASK_COUNT); + + BufferedTable.ExceptionListener listener = new BufferedTable.ExceptionListener() { + @Override + public void onException(Throwable e, Mutation m) { + workQueue.add((Put) m); + } + }; + + // populate work queue + for (int j = 0; j < ROWS_PER_TASK; j++) { + byte[] rowkey = new byte[25], qual, value; + rand.nextBytes(rowkey); + qual = Bytes.toBytes(rand.nextInt(20)); + value = Bytes.toBytes(rand.nextInt(100)); + Put p = new Put(rowkey); + p.add(FAMILY, qual, value); + workQueue.add(p); + } + + // each worker grabs a BufferedTable instance, single buffer shared across all threads + try (BufferedTable t = conn.getBufferedTable(TABLE, listener)) { + while (!workQueue.isEmpty()) { + Put p = workQueue.take(); + t.put(p); + } + } // t.close() is a noop, but maybe you want to call t.flush() to ensure my edits are sent before exiting + return null; + } + })); + } + + for (Future f : futures) { + f.get(5, TimeUnit.MINUTES); + } + } catch (IOException e) { + // exception while creating BufferedConnection + } + return 0; + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new BufferedTableExample(), args); + } +} -- 1.9.3 (Apple Git-50)