diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java
new file mode 100644
index 0000000..9a0740a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java
@@ -0,0 +1,75 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * This class is responsible to spool mutations to the configured filesystem.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class FilesystemMutationSpooler implements MutationSpooler {
+
+ /**
+ *
+ */
+ public FilesystemMutationSpooler() {
+ // TODO Auto-generated constructor stub
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(org.apache.hadoop.hbase.client.Mutation)
+ */
+ @Override
+ public void mutate(Mutation m) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(java.util.List)
+ */
+ @Override
+ public void mutate(List extends Mutation> ms) throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#flush()
+ */
+ @Override
+ public void flush() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#close()
+ */
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java
new file mode 100644
index 0000000..6d41ba4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java
@@ -0,0 +1,64 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Implementations are responsible to spool mutations.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface MutationSpooler {
+
+ /**
+ * Spool a {@link Mutation} to an external sink. Currently only supports
+ * {@link Put} and {@link Delete} mutations.
+ *
+ * @param mutation The data to send.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ public void mutate(Mutation m) throws IOException;
+
+ /**
+ * Spool a {@link Mutation}s to an external sink. Currently only supports
+ * {@link Put} and {@link Delete} mutations.
+ *
+ * @param mutation The data to send.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ public void mutate(List extends Mutation> ms) throws IOException;
+
+ /**
+ * Executes all the buffered, asynchronous {@link Mutation} operations and
+ * waits until they are done.
+ *
+ * @throws IOException if a remote or network exception occurs.
+ */
+ public void flush() throws IOException;
+
+ /**
+ * Performs a {@link #flush()} and releases any resources held.
+ *
+ * @throws IOException if a remote or network exception occurs.
+ */
+ public void close() throws IOException;
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java
new file mode 100644
index 0000000..e0b7fe1
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java
@@ -0,0 +1,245 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class keeps track of HBase state, and determines how long to wait for a
+ * submission.
+ *
+ * Note that we can get extra information from an exception handler, and that
+ * can help us determine that HBase is bad and/or back up again.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class SpoolingBufferedMutatorCoordinator {
+
+ private final TimeUnit TU = MILLISECONDS;
+
+ /**
+ * TODO: determine exact value, for example 100 seconds. TODO: make this
+ * configurable
+ */
+ private final long submitTimeoutMillis = 100000; // 100 seconds
+
+ /**
+ * The timeout increment to wait, after which we're asked again how much
+ * additional time to wait. We will not get control back for up to this time.
+ *
+ * Note: should be strictly smaller than submitTimeoutMillis.
+ *
+ * TODO: make this configurable
+ */
+ private final long timeoutIncrementMillis = 1000; // 1 second
+
+ /**
+ * Capture the states that HBase can be in.
+ */
+ @VisibleForTesting
+ enum State {
+ /**
+ * HBase has some issues. Spool data, and don't wait for mutations to
+ * complete. Next states {TRANSITIONING}
+ */
+ BAD,
+ /**
+ * HBase is in a good state, wait the regular time for mutations to
+ * complete. Do not spool. Next states {BAD}
+ */
+ GOOD,
+ /**
+ * HBase connection was bad before. Still spool, because we don't know
+ * exactly which mutations went to HBase, but do wait for mutations to
+ * complete, so that we're not bypassing submitting to the BufferedMutator.
+ * Next states {BAD,GOOD}
+ */
+ TRANSITIONING;
+
+ /**
+ * @param newState the state to transition to
+ * @return the new state if allowed
+ * @throws IllegalStateException if the transition isn't allowed.
+ */
+ public State transition(State newState) {
+ // Disallow illegal transitions
+ switch (this) {
+ case BAD:
+ if (newState == State.GOOD) {
+ throw new IllegalStateException(
+ "Cannot transition from " + this + " to " + newState);
+ }
+ break;
+ case GOOD:
+ if (newState == State.TRANSITIONING) {
+ throw new IllegalStateException(
+ "Cannot transition from " + this + " to " + newState);
+ }
+ break;
+ default:
+ break;
+ }
+ return newState;
+ }
+
+ }
+
+ /**
+ * Keep track of current state. Don't manipulate directly,
+ */
+ protected volatile State state = State.GOOD;
+
+ /**
+ * TODO: determine if the caller managers the lifecycle, or we do.
+ * @param spoolingES Non, null executor service on which to schedule spooling.
+ */
+ public SpoolingBufferedMutatorCoordinator() {
+ }
+
+ /**
+ * Transitions to a new state, if that state is legal.
+ * @param newState the new state to transition to.
+ */
+ @VisibleForTesting
+ protected void setState(State newState) {
+ state = state.transition(newState);
+ }
+
+ /**
+ * How much longer processing needs to wait for the current submission to be
+ * processed. A value of 0 indicates that HBase must be in a bad state and
+ * processing doesn't have to wait. Therefore, outbound items must be spooled,
+ * not dropped. Any value larger than 0 means that the processing must wait
+ * for completion for at least the specified amount of time or ask again.
+ *
+ * @param submission
+ * @return the time in {@link #getTimeUnit()} units to wait or 1 if submission is null
+ */
+ public synchronized long getTimeout(
+ SpoolingBufferedMutatorSubmission submission) {
+
+ if (submission == null) {
+ return 1;
+ }
+
+ if ((state == State.BAD) && (submission
+ .submissionType() != SpoolingBufferedMutatorSubmissionType.CLOSE)) {
+ return 0;
+ } else {
+ // We're in GOOD or TRANSITIONING state, or this is a CLOSE, so wait for
+ // submission to go through
+ // the BM.
+
+ // TODO: perhaps we want to wait longer for larger submissions.
+ long size = submission.size();
+ long count = submission.count();
+
+ long submitTime = submission.getSubmitNanonTime();
+
+ if (submitTime <= 0) {
+ // The submission hasn't been added to the BufferedMutator yet.
+ return timeoutIncrementMillis;
+ } else {
+ long now = System.nanoTime();
+ // We're measuring in nanos, but probably returning in a different unit.
+ long waitTime = TU.convert(now - submitTime, TimeUnit.NANOSECONDS);
+ if (waitTime < submitTimeoutMillis) {
+ return timeoutIncrementMillis;
+ } else {
+ // We had to wait too long for this submission to go through, assume
+ // HBase is in bad shape and start spooling.
+ setState(State.BAD);
+ return 0;
+ }
+ }
+ }
+
+ }
+
+ /**
+ * @return the time unit to use for waiting.
+ */
+ public TimeUnit getTimeUnit() {
+ return TU;
+ }
+
+ /**
+ * @param submission determines the flush count up to which we should spool or
+ * drop.
+ * @return whether outbound items in the batch represented by submission must
+ * be spooled or dropped.
+ */
+ public boolean shouldSpool(SpoolingBufferedMutatorSubmission submission) {
+ // TODO: should we keep track of the last successful flush that went through
+ // and perhaps still drop some items from outbound, even if we just went
+ // into a bad state?
+ if (state == State.GOOD) {
+ return false;
+ } else {
+ // Spool for both BAD and TRANSITIONING states.
+ return true;
+ }
+ }
+
+ /**
+ * Report that a submission is done.
+ * @param result the result of the submission
+ * @param cause if any exceptions were thrown.
+ */
+ public void report(SpoolingBufferedMutatorSubmission result,
+ Throwable cause) {
+ // TODO: put intelligence here to clean info from the cause to determine if
+ // HBase is down, and how severe the problem is.
+
+ switch (state) {
+ case BAD:
+ if (cause == null) {
+ setState(State.TRANSITIONING);
+ } else {
+ // TODO: stay in bad state, perhaps capture the time and count of
+ // remaining in
+ // bad state.
+ }
+ break;
+ case GOOD:
+ if (cause == null) {
+ // TODO: remain in good state. Perhaps capture count and time in good
+ // state.
+ } else {
+ // TODO: are we going to transition to bad state upon one single error?
+ setState(State.BAD);
+ }
+ break;
+ case TRANSITIONING:
+ if (cause == null) {
+ // TODO: determine when to transition back to good. Presumably we want 2
+ // or more good flushes before we transition back to good.
+ setState(State.GOOD);
+ } else {
+ setState(State.BAD);
+ }
+ break;
+ }
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java
new file mode 100644
index 0000000..160a6f4
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Used to Flush one batch of mutations from the outbound queue. Mutations can
+ * be spooled or dropped.
+ */
+public class SpoolingBufferedMutatorFlusher
+ implements Callable {
+
+ private final BlockingQueue outbound;
+ private final SpoolingBufferedMutatorCoordinator coordinator;
+
+ private final MutationSpooler spooler;
+ private final SpoolingBufferedMutatorSubmission submission;
+
+ /**
+ * @param submission the {@link SpoolingBufferedMutatorSubmissionType#FLUSH}
+ * or {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submission
+ * to flush out of the outbound queue. We need to wait on the
+ * submission to be ready to spool. It may not have made it in the
+ * queue yet, and/or we may not yet have decided whether to drop or
+ * spool this batch of mutations. Cannot be a mutate submission.
+ * Cannot be null.
+ * @param outbound the queue from which to flush submissions.
+ * @param coordinator that helps determine whether itesms should be spooled or
+ * dropped.
+ * @param spooler used to flush, if the coordinator indicates so.
+ */
+ public SpoolingBufferedMutatorFlusher(
+ SpoolingBufferedMutatorSubmission submission,
+ BlockingQueue outbound,
+ SpoolingBufferedMutatorCoordinator coordinator, MutationSpooler spooler) {
+ // TODO: validate inputs
+ this.submission = submission;
+ this.outbound = outbound;
+ this.coordinator = coordinator;
+ this.spooler = spooler;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.concurrent.Callable#call()
+ */
+ public SpoolingBufferedMutatorSubmission call() throws Exception {
+
+ // TODO: configure the wait time for the flush to go through and decide what
+ // to do if we time out.
+ long timeout = Long.MAX_VALUE;
+ TimeUnit tu = TimeUnit.SECONDS;
+
+ // We need to wait because the BufferedMutatorImpl has enqueued the flush,
+ // but we don't yet know if it has been processed and whether it was
+ // successfully submitted.
+ submission.waitToFlush(timeout, tu);
+
+ // Flush items up to this flushCount, but none later. Items could have been
+ // enqueued into inbound (and therefore outbound) out of order.
+ long maxFlushCount = submission.flushCount();
+
+ // Coordinator determines what to do.
+ boolean shouldSpool = coordinator.shouldSpool(submission);
+
+ // We cannot quite drain the entire queue because a) out of order items
+ // might be in there, and b) items could be added after we started
+ // iterating.
+ int outboundSize = outbound.size();
+
+ SpoolingBufferedMutatorSubmission s;
+ for (int i = outboundSize; i > 0; i--) {
+ // TODO: should we use poll and possibly time out? What to do if we do
+ // time out?
+ s = outbound.take();
+ if (s.flushCount() > maxFlushCount) {
+ // Put it back for later processing.
+ outbound.put(s);
+ } else {
+ // TODO: should we always close, no matter whether we should flush or
+ // not?
+ if (shouldSpool) {
+ switch (s.submissionType()) {
+ case CLOSE:
+ spooler.close();
+ break;
+ case FLUSH:
+ spooler.flush();
+ break;
+ case MUTATE:
+ spooler.mutate(s.mutations());
+ break;
+ default:
+ // TODO: log some sort of error message.
+ break;
+ }
+ } else {
+ // Do no spool, just drop the record on the ground.
+ }
+ // TODO: the flusher should probably not drop any flushes other than the
+ // its "own" flush.
+ }
+ } // for loop over outbound queue
+
+ // TODO: perhaps add some debugging with counts here.
+
+ // Return so that the caller (SpoolingBufferedMutatorImpl) knows this flush
+ // or close have been completed..
+ return submission;
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java
new file mode 100644
index 0000000..ae46dec
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java
@@ -0,0 +1,343 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.CLOSE;
+import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.FLUSH;
+import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.MUTATE;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SpoolingBufferedMutatorImpl implements BufferedMutator {
+
+ private static final Log LOG =
+ LogFactory.getLog(SpoolingBufferedMutatorImpl.class);
+
+ /**
+ * Queue to hand off to submission.
+ */
+ @VisibleForTesting
+ final BlockingQueue inbound =
+ new LinkedBlockingQueue<>();
+
+ /**
+ * Queue to track submissions that need to possibly be spooled.
+ */
+ @VisibleForTesting
+ final BlockingQueue outbound =
+ new LinkedBlockingQueue<>();
+
+ private final BufferedMutator wrapped;
+ private volatile boolean closed = false;
+
+ /**
+ * Track the number of flushes we see, so that a batch of submissions are
+ * marked with the same flushCount. Note that multiple threads can mutate, so
+ * mutations of different flushCounts could be queued out of order.
+ */
+ private final AtomicLong flushCount = new AtomicLong(0);
+
+ private final SpoolingBufferedMutatorCoordinator coordinator;
+
+ /**
+ * Only one single process will be running here (at a time). The executor
+ * service will re-spawn a thread if it crashes.
+ */
+ ExecutorService processES = Executors.newSingleThreadExecutor();
+
+ /**
+ * Used to push submissions into the wrapped BufferedMutator.
+ */
+ ExecutorService submitES = Executors.newSingleThreadExecutor();
+
+ /**
+ * Used to run flush requests, one per flush.
+ */
+ ExecutorService flushES = Executors.newSingleThreadExecutor();
+
+ /**
+ * Hang on to this, in order to be able to cancel.
+ */
+ private final Future processResult;
+
+ private final MutationSpooler spooler;
+
+ /**
+ *
+ * @param bm the buffered mutator that will be delegated to.
+ * @param c
+ */
+ SpoolingBufferedMutatorImpl(BufferedMutator bm,
+ SpoolingBufferedMutatorCoordinator c, MutationSpooler spooler) {
+ if (bm == null) {
+ throw new IllegalArgumentException(
+ "Wrapped BufferedMutator cannot be null.");
+ }
+ this.wrapped = bm;
+ if (c == null) {
+ this.coordinator = new SpoolingBufferedMutatorCoordinator();
+ } else {
+ this.coordinator = c;
+ }
+
+ this.spooler = spooler;
+
+ SpoolingBufferingMutatorProcessor processor =
+ new SpoolingBufferingMutatorProcessor(inbound, outbound, wrapped,
+ submitES, coordinator);
+
+ // TODO: determine if we should create executor services, or let these be
+ // managed in the underlying structures.
+
+ processResult = processES.submit(processor);
+ }
+
+ /**
+ * Requests a BufferedMutator from the passed connection and wraps it.
+ *
+ * @param conn
+ * @param rpcCallerFactory ignored.
+ * @param rpcFactory ignored
+ * @param params used to pass to connection, minus the class name that causes
+ * it to create this SpoolingBufferedMutator
+ * @throws IOException
+ */
+ SpoolingBufferedMutatorImpl(ClusterConnection conn,
+ RpcRetryingCallerFactory rpcCallerFactory,
+ RpcControllerFactory rpcFactory, BufferedMutatorParams params)
+ throws IOException {
+
+ BufferedMutatorParams clonedParams = null;
+ if (params != null) {
+ clonedParams = params.clone();
+ }
+ // Wipe out the reference to this class to avoid an infinite loop
+ clonedParams.implementationClassName(null);
+
+ // TODO: is it possible to have the BufferedMutator creation to hang / fail
+ // and therefore end up in a bad state to begin with? If so, we should
+ // change the signature and let the Processor or Submitter create the
+ // connection, perhaps as a separate SubmissionType. That way we won't stall
+ // if we want to connect to an HBase cluster that is already down, and we'd
+ // start spooling right away.
+ // TODO: Similarly, we probably want to ensure that we can connect (?) to
+ // the spooling filesystem.
+ this.wrapped = conn.getBufferedMutator(clonedParams);
+ this.coordinator = new SpoolingBufferedMutatorCoordinator();
+
+ // TODO: read some config variable to make this pluggable.
+ spooler = new FilesystemMutationSpooler();
+
+ SpoolingBufferingMutatorProcessor processor =
+ new SpoolingBufferingMutatorProcessor(inbound, outbound, wrapped,
+ submitES, coordinator);
+
+ // TODO: determine if we should create executor services, or let these be
+ // managed in the underlying structures.
+
+ processResult = processES.submit(processor);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#getName()
+ */
+ @Override
+ public TableName getName() {
+ return wrapped.getName();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#getConfiguration()
+ */
+ @Override
+ public Configuration getConfiguration() {
+ return wrapped.getConfiguration();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hbase.client.BufferedMutator#mutate(org.apache.hadoop.
+ * hbase.client.Mutation)
+ */
+ @Override
+ public void mutate(Mutation m) throws IOException {
+ mutate(Collections.singletonList(m));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#mutate(java.util.List)
+ */
+ @Override
+ public void mutate(List extends Mutation> ms) throws IOException {
+ if (closed) {
+ throw new IllegalStateException(
+ "Cannot put when the BufferedMutator is closed.");
+ }
+
+ if (ms == null) {
+ return;
+ }
+
+ SpoolingBufferedMutatorSubmission s =
+ new SpoolingBufferedMutatorSubmission(MUTATE, ms, flushCount.get());
+ try {
+ inbound.put(s);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while trying to mutate.", e);
+ // Yield and preserve status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#close()
+ */
+ @Override
+ public synchronized void close() throws IOException {
+
+ if (closed) {
+ return;
+ }
+ SpoolingBufferedMutatorSubmission s =
+ new SpoolingBufferedMutatorSubmission(CLOSE, null, flushCount.get());
+ try {
+ inbound.put(s);
+
+ waitForFlusher(s);
+
+ // TODO: do we need to do anything with processResult ?
+
+ // TODO: ensure that all executors are wrapped up.
+
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while trying to close.", e);
+ // Yield and preserve status
+ Thread.currentThread().interrupt();
+ } finally {
+ this.closed = true;
+ }
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#flush()
+ */
+ @Override
+ public synchronized void flush() throws IOException {
+ if (closed) {
+ throw new IllegalStateException(
+ "Cannot flush when the BufferedMutator is closed.");
+ }
+ SpoolingBufferedMutatorSubmission s = new SpoolingBufferedMutatorSubmission(
+ FLUSH, null, flushCount.incrementAndGet());
+ try {
+ inbound.put(s);
+ waitForFlusher(s);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while trying to flush.", e);
+ // Yield and preserve status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Wait for the flush to be submitted (or timedout).
+ * @param s the flush or close submission
+ * @throws InterruptedException
+ */
+ private void waitForFlusher(SpoolingBufferedMutatorSubmission s)
+ throws InterruptedException {
+ SpoolingBufferedMutatorFlusher flusher =
+ new SpoolingBufferedMutatorFlusher(s, outbound, coordinator, spooler);
+ // Wait for the close to complete before returning.
+ Future result = flushES.submit(flusher);
+
+ // TODO: properly handle all exceptions and deal with timeouts.
+ try {
+ // Block until this flush is done.
+ result.get();
+ } catch (ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#getWriteBufferSize()
+ */
+ @Override
+ public long getWriteBufferSize() {
+ return wrapped.getWriteBufferSize();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#setRpcTimeout(int)
+ */
+ @Override
+ public void setRpcTimeout(int timeout) {
+ wrapped.setOperationTimeout(timeout);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hbase.client.BufferedMutator#setOperationTimeout(int)
+ */
+ @Override
+ public void setOperationTimeout(int timeout) {
+ wrapped.setOperationTimeout(timeout);
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java
new file mode 100644
index 0000000..d92bf46
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java
@@ -0,0 +1,228 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.MUTATE;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Used to track mutations, flush, and close actions on the
+ * {@link SpoolingBufferedMutatorImpl}
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class SpoolingBufferedMutatorSubmission {
+
+ private final SpoolingBufferedMutatorSubmissionType submissionType;
+ private final List extends Mutation> mutations;
+ private final long flushCount;
+ /**
+ * The cumulative heap-size of the list of mutations.
+ */
+ private final long size;
+ /**
+ * The number of mutations that this submission consists of.
+ */
+ private final long count;
+
+ private volatile long submitNanonTime = 0;
+ private volatile long completionNanonTime = 0;
+
+ /**
+ * Used so that the spooler thread known whether this flush or close mutation
+ * has made it through submission (or timed out).
+ */
+ private final CountDownLatch flushLatch;
+
+ /**
+ * @param submissionType non-null type
+ * @param mutations Cannot be null for
+ * {@link SpoolingBufferedMutatorSubmissionType#MUTATE}, but is
+ * ignored for {@link SpoolingBufferedMutatorSubmissionType#FLUSH}
+ * and {@link SpoolingBufferedMutatorSubmissionType#CLOSE}
+ * @param flushCount indicating which batch this submission is part of
+ */
+ public SpoolingBufferedMutatorSubmission(
+ SpoolingBufferedMutatorSubmissionType submissionType,
+ List extends Mutation> mutations, long flushCount) {
+ if (submissionType == null) {
+ throw new IllegalArgumentException("Submissiontype cannot be null.");
+ }
+ this.submissionType = submissionType;
+
+ if (submissionType == MUTATE) {
+ if (mutations == null) {
+ throw new IllegalArgumentException("Cannot mutate nothing.");
+ }
+ this.mutations = mutations;
+ long toAddSize = 0;
+ int toAddCount = 0;
+ for (Mutation m : mutations) {
+ toAddSize += m.heapSize();
+ ++toAddCount;
+ }
+ size = toAddSize;
+ count = toAddCount;
+ flushLatch = null;
+ } else {
+ this.mutations = null;
+ this.size = 0;
+ this.count = 0;
+ flushLatch = new CountDownLatch(1);
+ }
+
+ this.flushCount = flushCount;
+ }
+
+ public SpoolingBufferedMutatorSubmissionType submissionType() {
+ return submissionType;
+ }
+
+ /**
+ * @return mutations. Cannot be null for
+ * {@link SpoolingBufferedMutatorSubmissionType#MUTATE}, but is null
+ * for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and
+ * {@link SpoolingBufferedMutatorSubmissionType#CLOSE}
+ */
+ public List extends Mutation> mutations() {
+ return mutations;
+ }
+
+ public long flushCount() {
+ return flushCount;
+ }
+
+ /**
+ * @return the total heap size of all mutations.
+ */
+ public long size() {
+ return size;
+ }
+
+ /**
+ * @return the number of mutations.
+ */
+ public long count() {
+ return count;
+ }
+
+ /**
+ * Use the caller's thread to submit. Note that this can block.
+ * @param bm the buffered mutator on which to submit this. Cannot be null.
+ * @throws IOException if a remote or network exception occurs.
+ */
+ public void submit(BufferedMutator bm) throws IOException {
+ submitNanonTime = System.nanoTime();
+ completionNanonTime = 0;
+ switch (submissionType) {
+ case CLOSE:
+ bm.close();
+ break;
+ case FLUSH:
+ bm.flush();
+ break;
+ case MUTATE:
+ bm.mutate(mutations);
+ break;
+ default:
+ break;
+ }
+ completionNanonTime = System.nanoTime();
+ }
+
+ /**
+ * @return the submitNanonTime. Can be 0 if not yet submitted.
+ */
+ public long getSubmitNanonTime() {
+ return submitNanonTime;
+ }
+
+ /**
+ * @return the completionNanonTime. Can be 0 when not yet completed.
+ */
+ public long getCompletionNanonTime() {
+ return completionNanonTime;
+ }
+
+ /**
+ * To be called by the processor to indicate that this submission is ready to
+ * be spooled, whether that is actually spooling or dropping.
+ */
+ public void readyToFlush() {
+ if (flushLatch != null) {
+ flushLatch.countDown();
+ }
+ }
+
+ /**
+ * Causes the current thread to wait until {@link #readyToFlush()} is called
+ * for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and
+ * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submissions, unless the
+ * thread is interrupted, or the specified waiting time elapses.
+ *
+ * For {@link SpoolingBufferedMutatorSubmissionType#MUTATE} then this method
+ * returns immediately with the value true.
+ *
+ * If {@link #readyToFlush()} has already been called then this method returns
+ * immediately with the value true.
+ *
+ * For {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and
+ * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submissions, if
+ * {@link #readyToFlush()} has not yet been called then the current thread
+ * becomes disabled for thread scheduling purposes and lies dormant until one
+ * of three things happen:
+ *
+ * The {@link #readyToFlush()} is called; or Some other thread interrupts the
+ * current thread; or The specified waiting time elapses. If
+ * {@link #readyToFlush()} is called then the method returns with the value
+ * true.
+ *
+ * If the current thread:
+ *
+ * has its interrupted status set on entry to this method; or is interrupted
+ * while waiting, then InterruptedException is thrown and the current thread's
+ * interrupted status is cleared. If the specified waiting time elapses then
+ * the value false is returned. If the time is less than or equal to zero, the
+ * method will not wait at all.
+ *
+ * Parameters: Throws: InterruptedException - if the current thread is
+ * interrupted while waiting
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return Returns: true for
+ * {@link SpoolingBufferedMutatorSubmissionType#MUTATE} and otherwise
+ * if {@link #readyToFlush()} has been called and false if the waiting
+ * time elapsed before the count reached zero
+ * @throws InterruptedException
+ */
+ public boolean waitToFlush(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (flushLatch == null) {
+ // Should happen only for mutations
+ return true;
+ } else {
+ return flushLatch.await(timeout, unit);
+ }
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java
new file mode 100644
index 0000000..1c2b696
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java
@@ -0,0 +1,37 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+/**
+ * Used to indicte what this particular submission is intended for.
+ */
+enum SpoolingBufferedMutatorSubmissionType {
+
+ /**
+ * Indicate that a mutation is submitted.
+ */
+ MUTATE,
+ /**
+ * Indicate that buffers should be flushed.
+ */
+ FLUSH,
+ /**
+ * Indicate that mutators and output should be closed.
+ */
+ CLOSE;
+
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java
new file mode 100644
index 0000000..3b25cea
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * This class is responsible for submitting mutations to the BufferedMutator
+ * that could potentially hang for a while. This allows the code that submits
+ * this Callable to make progress and determine to spool when the
+ * BufferedMutator takes too long.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class SpoolingBufferedMutatorSubmitter
+ implements Callable {
+
+ private final BufferedMutator bm;
+ private final SpoolingBufferedMutatorSubmission submission;
+ private final SpoolingBufferedMutatorCoordinator coordinator;
+
+ /**
+ *
+ * @param mutation to be submitted to the BufferedMutator
+ * @param bm where to submit mutations to
+ * @param coordinator
+ * @param flushCount Indicator of the flushCount that this mutation is part
+ * of.
+ */
+ public SpoolingBufferedMutatorSubmitter(
+ SpoolingBufferedMutatorSubmission submission, BufferedMutator bm,
+ SpoolingBufferedMutatorCoordinator coordinator) {
+ this.submission = submission;
+ this.bm = bm;
+ this.coordinator = coordinator;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.concurrent.Callable#call()
+ */
+ @Override
+ public SpoolingBufferedMutatorSubmission call() throws Exception {
+ Exception cause = null;
+ try {
+ // Delegate this to the submission in order to track timing
+ submission.submit(bm);
+ } catch (Exception e) {
+ cause = e;
+ }
+ // Progress has to be reported from the submitter, because the coordinator
+ // might have moved on to another item, or perhaps it is blocked and waiting
+ // for a next submission to come in. We need to be able to recover from a
+ // bad state.
+ coordinator.report(submission, cause);
+ if (cause != null) {
+ throw cause;
+ }
+ return submission;
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java
new file mode 100644
index 0000000..da92dcd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java
@@ -0,0 +1,159 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the
+ * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * The processor is responsible to take items from the inbound queue, present
+ * them to the submitter, and enqueue them into the outbound queue.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class SpoolingBufferingMutatorProcessor implements Callable {
+
+ private static final Log LOG =
+ LogFactory.getLog(SpoolingBufferingMutatorProcessor.class);
+
+ final BlockingQueue inbound;
+ final BlockingQueue outbound;
+
+ private final BufferedMutator bm;
+ private final ExecutorService es;
+ private final SpoolingBufferedMutatorCoordinator coordinator;
+
+ private volatile boolean stopped = false;
+
+ /**
+ * @param inbound queue where submissions are entered on
+ * @param outbound where submissions are stashed until they can get spooled or
+ * dropped (once we can confirm that all mutations were successfully
+ * flushed).
+ * @param bm the buffered mutator on which the
+ * {@link SpoolingBufferedMutatorSubmitter} should submit items
+ * @param es used to submit {@link SpoolingBufferedMutatorSubmitter} tasks to
+ * talk to the wrapped BufferedMutator. The caller is to manage the
+ * lifecycle for this ExecutorService.
+ *
+ */
+ public SpoolingBufferingMutatorProcessor(
+ BlockingQueue inbound,
+ BlockingQueue outbound,
+ BufferedMutator bm, ExecutorService es,
+ SpoolingBufferedMutatorCoordinator coordinator) {
+ // TODO: check arguments
+ this.inbound = inbound;
+ this.outbound = outbound;
+ this.bm = bm;
+ this.es = es;
+ this.coordinator = coordinator;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.concurrent.Callable#call()
+ */
+ @Override
+ public Void call() throws Exception {
+
+ SpoolingBufferedMutatorSubmission submission = null;
+ // Time in nanoseconds for which to remain waiting for the future to
+ // complete
+ long timeout = 1;
+ Future future = null;
+ while (!stopped) {
+
+ submission = inbound.take();
+ outbound.put(submission);
+ timeout = coordinator.getTimeout(submission);
+
+ // Keep waiting until we're told to no longer wait.
+ // Not waiting when the future isn't done means we're moving submissions
+ // to the outbound queue without submitting them, in other words when
+ // they need to be spooled.
+ while (timeout > 0) {
+
+ if (future == null) {
+ // Submit the next item. Submit a new item only if the previously
+ // submitted one was done. Otherwise, we're bypassing.
+ SpoolingBufferedMutatorSubmitter submitter =
+ new SpoolingBufferedMutatorSubmitter(submission, bm, coordinator);
+ try {
+ future = es.submit(submitter);
+ } catch (RejectedExecutionException ree) {
+ LOG.error("Cannot submit mutation submittion.", ree);
+ // TODO: deal with this.
+ }
+ }
+
+ try {
+ future.get(timeout, TimeUnit.NANOSECONDS);
+ } catch (CancellationException ce) {
+ // We got interrupted, this means that we should yield
+ LOG.info(
+ "SpoolingBufferedMutatorProcessor got interrupted, yielding.");
+ stopped = true;
+ Thread.interrupted();
+ break;
+ } catch (ExecutionException ee) {
+ LOG.debug("SpoolingBufferedMutatorProcessor submission failed.",
+ ee.getCause());
+ // TODO: determine what we do next
+ } catch (InterruptedException ie) {
+ // We got interrupted, this means that we should yield
+ LOG.info(
+ "SpoolingBufferedMutatorProcessor got interrupted, yielding.");
+ stopped = true;
+ Thread.currentThread().interrupt();
+ break;
+ } catch (TimeoutException te) {
+ timeout = coordinator.getTimeout(submission);
+ }
+
+ // indicate that we're done with this submission.
+ if (future.isDone()) {
+ // Wipe out the future, so that we can submit the next item
+ future = null;
+ break;
+ }
+
+ } // while waiting for the currentSubmission to finish
+
+ // The submission is either finished, or timed out, we need to indicate it
+ // is time to move on
+ submission.readyToFlush();
+
+ } // while not stopped
+ // The only way to get here is when we need to shut down.
+ bm.close();
+ return null;
+ }
+
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockBufferedMutatorImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockBufferedMutatorImpl.java
new file mode 100644
index 0000000..598c741
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockBufferedMutatorImpl.java
@@ -0,0 +1,179 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Mock BufferedMutator implementation used for testing purposes.
+ */
+class MockBufferedMutatorImpl implements BufferedMutator {
+
+ private final TableName tableName;
+ private long writeBufferSize;
+ private int rpcTimeout = 0;
+ private int operationTimeout = 0;
+
+ final BlockingQueue> mutated =
+ new LinkedBlockingQueue<>();
+ private boolean closed;
+ private boolean flushed;
+
+ /**
+ *
+ */
+ public MockBufferedMutatorImpl(TableName tableName) {
+ this.tableName = tableName;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#getName()
+ */
+ public TableName getName() {
+ return tableName;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#getConfiguration()
+ */
+ public Configuration getConfiguration() {
+ // Return null, we're not interested in this method.
+ return null;
+ }
+
+ /**
+ * returns all spooled mutations sofar, resets the queue, and resets the close
+ * and flushed attributes.
+ * @return the list of all mutation lists.
+ */
+ synchronized List super List extends Mutation>> getMutated() {
+ List super List extends Mutation>> results = new LinkedList<>();
+ mutated.drainTo(results);
+ flushed = false;
+ closed = false;
+ return results;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hbase.client.BufferedMutator#mutate(org.apache.hadoop.
+ * hbase.client.Mutation)
+ */
+ public synchronized void mutate(Mutation m) throws IOException {
+ try {
+ mutated.put(Collections.singletonList(m));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while spooling.", e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#mutate(java.util.List)
+ */
+ public synchronized void mutate(List extends Mutation> ms)
+ throws IOException {
+ try {
+ mutated.put(ms);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while spooling.", e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#close()
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ this.closed = true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#flush()
+ */
+ public synchronized void flush() throws IOException {
+ this.flushed = true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#getWriteBufferSize()
+ */
+ public long getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.BufferedMutator#setRpcTimeout(int)
+ */
+ public void setRpcTimeout(int timeout) {
+ this.rpcTimeout = timeout;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hbase.client.BufferedMutator#setOperationTimeout(int)
+ */
+ public void setOperationTimeout(int timeout) {
+ this.operationTimeout = timeout;
+ }
+
+ /**
+ * @return whether {@link #flush()} was called since the last call to
+ * {@link #getMutated()} was called.
+ */
+ public boolean flushed() {
+ return flushed;
+ }
+
+ /**
+ * @return whether {@link #close()} was called since the last call to
+ * {@link #getMutated()} was called.
+ */
+ public boolean closed() {
+ return closed;
+ }
+
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockMutationSpooler.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockMutationSpooler.java
new file mode 100644
index 0000000..3ea65a6
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockMutationSpooler.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Mock implementation that simply queues all mutations to a queue so that they
+ * can be later returned and inspected.
+ */
+class MockMutationSpooler implements MutationSpooler {
+
+ final BlockingQueue> spooled =
+ new LinkedBlockingQueue<>();
+ private boolean flushed = false;
+ private boolean closed = false;
+
+ /**
+ * To be used for testing.
+ */
+ MockMutationSpooler() {
+ }
+
+ /**
+ * Clears the mutations and the flushed status.
+ * @return the list of all mutation lists.
+ */
+ synchronized List super List extends Mutation>> getSpooled() {
+ List super List extends Mutation>> results = new LinkedList<>();
+ spooled.drainTo(results);
+ spooled.clear();
+ flushed = false;
+ closed = false;
+ return results;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.hbase.client.MutationSpooler#mutate(org.apache.hadoop.
+ * hbase.client.Mutation)
+ */
+ public synchronized void mutate(Mutation m) throws IOException {
+ try {
+ spooled.put(Collections.singletonList(m));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while spooling.", e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(java.util.List)
+ */
+ public synchronized void mutate(List extends Mutation> ms)
+ throws IOException {
+ try {
+ spooled.put(ms);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while spooling.", e);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#flush()
+ */
+ public synchronized void flush() throws IOException {
+ flushed = true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.hadoop.hbase.client.MutationSpooler#close()
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ closed = true;
+ }
+
+ /**
+ * @return whether this spooled has been flushed since the last call to
+ * {@link #getSpooled()}
+ */
+ public boolean flushed() {
+ return flushed;
+ }
+
+ /**
+ * @return whether this closed has been flushed since the last call to
+ * {@link #getSpooled()}
+ */
+ public boolean closed() {
+ return closed;
+ }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorCoordinator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorCoordinator.java
new file mode 100644
index 0000000..4929ce0
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorCoordinator.java
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.CLOSE;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator.State;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestSpoolingBufferedMutatorCoordinator {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testTransitions() {
+ SpoolingBufferedMutatorCoordinator coordinator =
+ new SpoolingBufferedMutatorCoordinator();
+ coordinator.setState(State.GOOD);
+ assertTrue(coordinator.getTimeout(null) > 0);
+
+ coordinator.setState(State.BAD);
+ coordinator.setState(State.TRANSITIONING);
+ coordinator.setState(State.BAD);
+ coordinator.setState(State.TRANSITIONING);
+ coordinator.setState(State.GOOD);
+
+ // Can't do GOOD->TRANSITIONING
+ exception.expect(IllegalStateException.class);
+ coordinator.setState(State.TRANSITIONING);
+
+ coordinator.setState(State.GOOD);
+ coordinator.setState(State.BAD);
+
+ // Can't do BAD->GOOD
+ exception.expect(IllegalStateException.class);
+ coordinator.setState(State.GOOD);
+
+ coordinator.setState(State.BAD);
+ }
+
+ @Test
+ public void testClose() {
+ // A close should never be 0, no matter what state
+ SpoolingBufferedMutatorCoordinator coordinator =
+ new SpoolingBufferedMutatorCoordinator();
+ coordinator.setState(State.GOOD);
+ SpoolingBufferedMutatorSubmission closeSubmission =
+ new SpoolingBufferedMutatorSubmission(CLOSE, null, 0);
+ assertTrue(coordinator.getTimeout(closeSubmission) > 0);
+
+ coordinator.setState(State.BAD);
+ assertTrue(coordinator.getTimeout(closeSubmission) > 0);
+ }
+
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorImpl.java
new file mode 100644
index 0000000..cdb5cf8
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorImpl.java
@@ -0,0 +1,102 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator.State;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ *
+ */
+@Category({ ClientTests.class, MediumTests.class })
+public class TestSpoolingBufferedMutatorImpl {
+
+ @Before
+ public void setUp() throws Exception {
+ // TODO: set up items before test
+ }
+
+ @Test
+ public void test() throws IOException {
+ TableName tableName = TableName.valueOf("someTable");
+ MockBufferedMutatorImpl wrapped = new MockBufferedMutatorImpl(tableName);
+ SpoolingBufferedMutatorCoordinator coordinator =
+ new SpoolingBufferedMutatorCoordinator();
+ MockMutationSpooler spooler = new MockMutationSpooler();
+ BufferedMutator sbmi =
+ new SpoolingBufferedMutatorImpl(wrapped, coordinator, spooler);
+ byte[] rowkey = "rowKey".getBytes();
+ byte[] family = "cf".getBytes();
+ byte[] qualifier = "cq1".getBytes();
+ byte[] value = "val1".getBytes();
+ Put p = new Put(rowkey);
+ p.setAttribute("a1", "v1".getBytes());
+ long timestamp = System.currentTimeMillis();
+ p.addColumn(family, qualifier, timestamp, value);
+ sbmi.mutate(p);
+
+ qualifier = "cq2".getBytes();
+ value = "val2".getBytes();
+ p.addColumn(family, qualifier, timestamp, value);
+ sbmi.mutate(p);
+
+ assertFalse(wrapped.flushed());
+ assertFalse(wrapped.closed());
+
+ sbmi.flush();
+
+ assertTrue(wrapped.flushed());
+ assertFalse(wrapped.closed());
+
+ List super List extends Mutation>> spooled = spooler.getSpooled();
+ assertEquals(0, spooled.size());
+
+ List super List extends Mutation>> mutated = wrapped.getMutated();
+ assertEquals(2, mutated.size());
+
+ coordinator.setState(State.BAD);
+
+ sbmi.mutate(p);
+ sbmi.flush();
+
+ mutated = wrapped.getMutated();
+ assertEquals(0, mutated.size());
+
+ spooled = spooler.getSpooled();
+ assertEquals(1, spooled.size());
+
+ sbmi.close();
+ assertTrue(spooler.closed());
+ assertTrue(wrapped.closed());
+
+ }
+
+}