diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java new file mode 100644 index 0000000..9a0740a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class is responsible to spool mutations to the configured filesystem. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class FilesystemMutationSpooler implements MutationSpooler { + + /** + * + */ + public FilesystemMutationSpooler() { + // TODO Auto-generated constructor stub + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(org.apache.hadoop.hbase.client.Mutation) + */ + @Override + public void mutate(Mutation m) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(java.util.List) + */ + @Override + public void mutate(List ms) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#flush() + */ + @Override + public void flush() throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#close() + */ + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java new file mode 100644 index 0000000..6d41ba4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Implementations are responsible to spool mutations. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface MutationSpooler { + + /** + * Spool a {@link Mutation} to an external sink. Currently only supports + * {@link Put} and {@link Delete} mutations. + * + * @param mutation The data to send. + * @throws IOException if a remote or network exception occurs. + */ + public void mutate(Mutation m) throws IOException; + + /** + * Spool a {@link Mutation}s to an external sink. Currently only supports + * {@link Put} and {@link Delete} mutations. + * + * @param mutation The data to send. + * @throws IOException if a remote or network exception occurs. + */ + public void mutate(List ms) throws IOException; + + /** + * Executes all the buffered, asynchronous {@link Mutation} operations and + * waits until they are done. + * + * @throws IOException if a remote or network exception occurs. + */ + public void flush() throws IOException; + + /** + * Performs a {@link #flush()} and releases any resources held. + * + * @throws IOException if a remote or network exception occurs. + */ + public void close() throws IOException; + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java new file mode 100644 index 0000000..208d43b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java @@ -0,0 +1,246 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class keeps track of HBase state, and determines how long to wait for a + * submission. + *

+ * Note that we can get extra information from an exception handler, and that + * can help us determine that HBase is bad and/or back up again. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferedMutatorCoordinator { + + private final TimeUnit TU = MILLISECONDS; + + /** + * TODO: determine exact value, for example 100 seconds. TODO: make this + * configurable + */ + private final long submitTimeoutMillis = 100000; // 100 seconds + + /** + * The timeout increment to wait, after which we're asked again how much + * additional time to wait. We will not get control back for up to this time. + *

+ * Note: should be strictly smaller than submitTimeoutMillis. + *

+ * TODO: make this configurable + */ + private final long timeoutIncrementMillis = 1000; // 1 second + + /** + * Capture the states that HBase can be in. + */ + @VisibleForTesting + enum State { + /** + * HBase has some issues. Spool data, and don't wait for mutations to + * complete. Next states {TRANSITIONING} + */ + BAD, + /** + * HBase is in a good state, wait the regular time for mutations to + * complete. Do not spool. Next states {BAD} + */ + GOOD, + /** + * HBase connection was bad before. Still spool, because we don't know + * exactly which mutations went to HBase, but do wait for mutations to + * complete, so that we're not bypassing submitting to the BufferedMutator. + * Next states {BAD,GOOD} + */ + TRANSITIONING; + + /** + * @param newState the state to transition to + * @return the new state if allowed + * @throws IllegalStateException if the transition isn't allowed. + */ + public State transition(State newState) { + // Disallow illegal transitions + switch (this) { + case BAD: + if (newState == State.GOOD) { + throw new IllegalStateException( + "Cannot transition from " + this + " to " + newState); + } + break; + case GOOD: + if (newState == State.TRANSITIONING) { + throw new IllegalStateException( + "Cannot transition from " + this + " to " + newState); + } + break; + default: + break; + } + return newState; + } + + } + + /** + * Keep track of current state. Don't manipulate directly, + */ + protected volatile State state = State.GOOD; + + /** + * TODO: determine if the caller managers the lifecycle, or we do. + * @param spoolingES Non, null executor service on which to schedule spooling. + */ + public SpoolingBufferedMutatorCoordinator() { + } + + /** + * Transitions to a new state, if that state is legal. + * @param newState the new state to transition to. + */ + @VisibleForTesting + protected void setState(State newState) { + state = state.transition(newState); + } + + /** + * How much longer processing needs to wait for the current submission to be + * processed. A value of 0 indicates that HBase must be in a bad state and + * processing doesn't have to wait. Therefore, outbound items must be spooled, + * not dropped. Any value larger than 0 means that the processing must wait + * for completion for at least the specified amount of time or ask again. + * + * @param submission + * @return the time in {@link #getTimeUnit()} units to wait or 1 if submission is null + */ + public synchronized long getTimeout( + SpoolingBufferedMutatorSubmission submission) { + + if (submission == null) { + return 1; + } + + // Even in BAD state we want to ensure that the close gets submitted. + if ((state == State.BAD) && (submission + .submissionType() != SpoolingBufferedMutatorSubmissionType.CLOSE)) { + return 0; + } else { + // We're in GOOD or TRANSITIONING state, or this is a CLOSE, so wait for + // submission to go through + // the BM. + + // TODO: perhaps we want to wait longer for larger submissions. + long size = submission.size(); + long count = submission.count(); + + long submitTime = submission.getSubmitNanonTime(); + + if (submitTime <= 0) { + // The submission hasn't been added to the BufferedMutator yet. + return timeoutIncrementMillis; + } else { + long now = System.nanoTime(); + // We're measuring in nanos, but probably returning in a different unit. + long waitTime = TU.convert(now - submitTime, TimeUnit.NANOSECONDS); + if (waitTime < submitTimeoutMillis) { + return timeoutIncrementMillis; + } else { + // We had to wait too long for this submission to go through, assume + // HBase is in bad shape and start spooling. + setState(State.BAD); + return 0; + } + } + } + + } + + /** + * @return the time unit to use for waiting. + */ + public TimeUnit getTimeUnit() { + return TU; + } + + /** + * @param submission determines the flush count up to which we should spool or + * drop. + * @return whether outbound items in the batch represented by submission must + * be spooled or dropped. + */ + public boolean shouldSpool(SpoolingBufferedMutatorSubmission submission) { + // TODO: should we keep track of the last successful flush that went through + // and perhaps still drop some items from outbound, even if we just went + // into a bad state? + if (state == State.GOOD) { + return false; + } else { + // Spool for both BAD and TRANSITIONING states. + return true; + } + } + + /** + * Report that a submission is done. + * @param result the result of the submission + * @param cause if any exceptions were thrown. + */ + public void report(SpoolingBufferedMutatorSubmission result, + Throwable cause) { + // TODO: put intelligence here to clean info from the cause to determine if + // HBase is down, and how severe the problem is. + + switch (state) { + case BAD: + if (cause == null) { + setState(State.TRANSITIONING); + } else { + // TODO: stay in bad state, perhaps capture the time and count of + // remaining in + // bad state. + } + break; + case GOOD: + if (cause == null) { + // TODO: remain in good state. Perhaps capture count and time in good + // state. + } else { + // TODO: are we going to transition to bad state upon one single error? + setState(State.BAD); + } + break; + case TRANSITIONING: + if (cause == null) { + // TODO: determine when to transition back to good. Presumably we want 2 + // or more good flushes before we transition back to good. + setState(State.GOOD); + } else { + setState(State.BAD); + } + break; + } + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java new file mode 100644 index 0000000..160a6f4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Used to Flush one batch of mutations from the outbound queue. Mutations can + * be spooled or dropped. + */ +public class SpoolingBufferedMutatorFlusher + implements Callable { + + private final BlockingQueue outbound; + private final SpoolingBufferedMutatorCoordinator coordinator; + + private final MutationSpooler spooler; + private final SpoolingBufferedMutatorSubmission submission; + + /** + * @param submission the {@link SpoolingBufferedMutatorSubmissionType#FLUSH} + * or {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submission + * to flush out of the outbound queue. We need to wait on the + * submission to be ready to spool. It may not have made it in the + * queue yet, and/or we may not yet have decided whether to drop or + * spool this batch of mutations. Cannot be a mutate submission. + * Cannot be null. + * @param outbound the queue from which to flush submissions. + * @param coordinator that helps determine whether itesms should be spooled or + * dropped. + * @param spooler used to flush, if the coordinator indicates so. + */ + public SpoolingBufferedMutatorFlusher( + SpoolingBufferedMutatorSubmission submission, + BlockingQueue outbound, + SpoolingBufferedMutatorCoordinator coordinator, MutationSpooler spooler) { + // TODO: validate inputs + this.submission = submission; + this.outbound = outbound; + this.coordinator = coordinator; + this.spooler = spooler; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.Callable#call() + */ + public SpoolingBufferedMutatorSubmission call() throws Exception { + + // TODO: configure the wait time for the flush to go through and decide what + // to do if we time out. + long timeout = Long.MAX_VALUE; + TimeUnit tu = TimeUnit.SECONDS; + + // We need to wait because the BufferedMutatorImpl has enqueued the flush, + // but we don't yet know if it has been processed and whether it was + // successfully submitted. + submission.waitToFlush(timeout, tu); + + // Flush items up to this flushCount, but none later. Items could have been + // enqueued into inbound (and therefore outbound) out of order. + long maxFlushCount = submission.flushCount(); + + // Coordinator determines what to do. + boolean shouldSpool = coordinator.shouldSpool(submission); + + // We cannot quite drain the entire queue because a) out of order items + // might be in there, and b) items could be added after we started + // iterating. + int outboundSize = outbound.size(); + + SpoolingBufferedMutatorSubmission s; + for (int i = outboundSize; i > 0; i--) { + // TODO: should we use poll and possibly time out? What to do if we do + // time out? + s = outbound.take(); + if (s.flushCount() > maxFlushCount) { + // Put it back for later processing. + outbound.put(s); + } else { + // TODO: should we always close, no matter whether we should flush or + // not? + if (shouldSpool) { + switch (s.submissionType()) { + case CLOSE: + spooler.close(); + break; + case FLUSH: + spooler.flush(); + break; + case MUTATE: + spooler.mutate(s.mutations()); + break; + default: + // TODO: log some sort of error message. + break; + } + } else { + // Do no spool, just drop the record on the ground. + } + // TODO: the flusher should probably not drop any flushes other than the + // its "own" flush. + } + } // for loop over outbound queue + + // TODO: perhaps add some debugging with counts here. + + // Return so that the caller (SpoolingBufferedMutatorImpl) knows this flush + // or close have been completed.. + return submission; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java new file mode 100644 index 0000000..4a1d77b --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java @@ -0,0 +1,462 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.CLOSE; +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.FLUSH; +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.MUTATE; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SpoolingBufferedMutatorImpl implements BufferedMutator { + + private static final Log LOG = + LogFactory.getLog(SpoolingBufferedMutatorImpl.class); + + /** + * Queue to hand off to submission. + */ + @VisibleForTesting + final BlockingQueue inbound = + new LinkedBlockingQueue<>(); + + /** + * Queue to track submissions that need to possibly be spooled. + */ + @VisibleForTesting + final BlockingQueue outbound = + new LinkedBlockingQueue<>(); + + private final BufferedMutator wrapped; + private volatile boolean closed = false; + + /** + * Track the number of flushes we see, so that a batch of submissions are + * marked with the same flushCount. Note that multiple threads can mutate, so + * mutations of different flushCounts could be queued out of order. + */ + private final AtomicLong flushCount = new AtomicLong(0); + + /** + * Keeps track of the HBase state + */ + private final SpoolingBufferedMutatorCoordinator coordinator; + + private final TableName tableName; + private volatile Configuration conf; + private long writeBufferSize; + private final int maxKeyValueSize; + private final AtomicLong currentBufferSize = new AtomicLong(0); + AtomicInteger undealtMutationCount = new AtomicInteger(0); + + /** + * Only one single process will be running here (at a time). The executor + * service will re-spawn a thread if it crashes. + */ + ExecutorService processES = Executors.newSingleThreadExecutor(); + + /** + * Used to push submissions into the wrapped BufferedMutator. + */ + ExecutorService submitES = Executors.newSingleThreadExecutor(); + + /** + * Used to run flush requests, one per flush. + */ + ExecutorService flushES = Executors.newSingleThreadExecutor(); + + /** + * Hang on to this, in order to be able to cancel. + */ + private final Future processResult; + + private final MutationSpooler spooler; + + // Volatile, because possibly set from a different thread and access not + // otherwise synchronized + private volatile int operationTimeout; + + /** + * + * @param bm the buffered mutator that will be delegated to. + * @param c + * @param params non-null params. + */ + @VisibleForTesting + SpoolingBufferedMutatorImpl(BufferedMutator bm, + SpoolingBufferedMutatorCoordinator c, MutationSpooler spooler, + BufferedMutatorParams params) { + if (bm == null) { + throw new IllegalArgumentException( + "Wrapped BufferedMutator cannot be null."); + } + if (params == null) { + throw new IllegalArgumentException("params cannot be null"); + } + + this.tableName = params.getTableName(); + this.writeBufferSize = + params.getWriteBufferSize() != BufferedMutatorParams.UNSET + ? params.getWriteBufferSize() : Long.MAX_VALUE; + this.maxKeyValueSize = + params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET + ? params.getMaxKeyValueSize() : Integer.MAX_VALUE; + this.operationTimeout = HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT; + + this.wrapped = bm; + if (c == null) { + this.coordinator = new SpoolingBufferedMutatorCoordinator(); + } else { + this.coordinator = c; + } + + this.spooler = spooler; + + SpoolingBufferingMutatorProcessor processor = + new SpoolingBufferingMutatorProcessor(inbound, outbound, wrapped, + submitES, coordinator); + + // TODO: determine if we should create executor services, or let these be + // managed in the underlying structures. + + processResult = processES.submit(processor); + } + + /** + * Requests a BufferedMutator from the passed connection and wraps it. + * + * @param conn + * @param rpcCallerFactory ignored. + * @param rpcFactory ignored + * @param params non-null params used to pass to connection (after we strip + * off the class name that causes it to create this + * SpoolingBufferedMutator). User must define a + * {@link BufferedMutatorParams#writeBufferSize(long)} and + * {@link BufferedMutatorParams#maxKeyValueSize(int)} for the + * SpoolingBufferedMutatorImpl to enfore these, else enforcement is + * done only by the wrapped BufferedMutator. This means the + * additional spooling buffer can grow unbouned until the next flush + * and keyvalues could be rejected later. The wrapped BufferedMutator + * will default to what is configured on the table, but this class + * doesn't wait to connect to the table. + * @throws IOException + */ + SpoolingBufferedMutatorImpl(ClusterConnection conn, + RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcFactory, BufferedMutatorParams params) + throws IOException { + + if (params == null) { + throw new IllegalArgumentException("params cannot be null"); + } + + this.tableName = params.getTableName(); + this.writeBufferSize = + params.getWriteBufferSize() != BufferedMutatorParams.UNSET + ? params.getWriteBufferSize() : Long.MAX_VALUE; + this.maxKeyValueSize = + params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET + ? params.getMaxKeyValueSize() : Integer.MAX_VALUE; + + this.operationTimeout = conn.getConfiguration().getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + + BufferedMutatorParams clonedParams = params.clone(); + + // Wipe out the reference to this class to avoid an infinite loop + clonedParams.implementationClassName(null); + + // TODO: is it possible to have the BufferedMutator creation to hang / fail + // and therefore end up in a bad state to begin with? If so, we should + // change the signature and let the Processor or Submitter create the + // connection, perhaps as a separate SubmissionType. That way we won't stall + // if we want to connect to an HBase cluster that is already down, and we'd + // start spooling right away. + // TODO: Similarly, we probably want to ensure that we can connect (?) to + // the spooling filesystem. + this.wrapped = conn.getBufferedMutator(clonedParams); + this.coordinator = new SpoolingBufferedMutatorCoordinator(); + + // TODO: read some config variable to make this pluggable. + spooler = new FilesystemMutationSpooler(); + + SpoolingBufferingMutatorProcessor processor = + new SpoolingBufferingMutatorProcessor(inbound, outbound, wrapped, + submitES, coordinator); + + // TODO: determine if we should create executor services, or let these be + // managed in the underlying structures. + + processResult = processES.submit(processor); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getName() + */ + public TableName getName() { + return tableName; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getConfiguration() + */ + public Configuration getConfiguration() { + return wrapped.getConfiguration(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.BufferedMutator#mutate(org.apache.hadoop. + * hbase.client.Mutation) + */ + public void mutate(Mutation m) throws IOException { + mutate(Collections.singletonList(m)); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#mutate(java.util.List) + */ + public void mutate(List ms) throws IOException { + if (closed) { + throw new IllegalStateException( + "Cannot put when the BufferedMutator is closed."); + } + + if (ms == null) { + return; + } + long toAddSize = 0; + int toAddCount = 0; + for (Mutation m : ms) { + if (m instanceof Put) { + validatePut((Put) m); + } + toAddSize += m.heapSize(); + ++toAddCount; + } + long previousBufferSize = currentBufferSize.getAndAdd(toAddSize); + undealtMutationCount.addAndGet(toAddCount); + + SpoolingBufferedMutatorSubmission s = + new SpoolingBufferedMutatorSubmission(MUTATE, ms, flushCount.get()); + try { + inbound.put(s); + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to mutate.", e); + // Yield and preserve status + Thread.currentThread().interrupt(); + } + + // Mutate isn't synchronized, so multiple mutations could be added before + // the flush. + // Only for the mutation list that crossed over the writeBufferSize should + // trigger a flush. + if ((currentBufferSize.get() > writeBufferSize) + && (previousBufferSize < writeBufferSize)) { + // Do not block + // Note it is still possible for the BufferedMutator to also reach the + // same conclusion that its buffers need to be flushed, so then our flush + // simply flushes one more time. Flushing empty buffers should be cheap. + // It may be possible that a couple more mutations raced through, in which + // case they'll be flushed by themselves. + // At least our outbound queue will be limited in size. + flush(false); + } + } + + // validate for well-formedness + private void validatePut(final Put put) throws IllegalArgumentException { + HTable.validatePut(put, maxKeyValueSize); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#close() + */ + public synchronized void close() throws IOException { + + if (closed) { + return; + } + // TODO: ensure that the close adheres to the operationTimeout (once I find + // if that is in millis) + + // Make sure that all data is flushed + flush(true); + + SpoolingBufferedMutatorSubmission s = + new SpoolingBufferedMutatorSubmission(CLOSE, null, flushCount.get()); + try { + inbound.put(s); + + waitForFlusher(s); + + processES.shutdown(); + submitES.shutdown(); + flushES.shutdown(); + + processResult.get(); + + // TODO: confirm this is actually in millis, plus keep track of remaining + // timeout. + if (!processES.awaitTermination(operationTimeout, TimeUnit.MILLISECONDS)) { + processES.shutdownNow(); + } + if (!submitES.awaitTermination(operationTimeout, TimeUnit.MILLISECONDS)) { + submitES.shutdownNow(); + } + if (!flushES.awaitTermination(operationTimeout, TimeUnit.MILLISECONDS)) { + flushES.shutdownNow(); + } + + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to close.", e); + // Yield and preserve status + Thread.currentThread().interrupt(); + } catch (ExecutionException ee) { + LOG.error("ExecutionException while waiting for shutdown.", ee); + } finally { + this.closed = true; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#flush() + */ + public synchronized void flush() throws IOException { + flush(true); + } + + /** + * Enqueue a flush operation and create a spooling submission. + * @param synchronous indicate whether flush should wait for the flush to + * complete. + * @throws IOException if a remote or network exception occurs. + */ + public void flush(boolean synchronous) throws IOException { + if (closed) { + throw new IllegalStateException( + "Cannot flush when the BufferedMutator is closed."); + } + SpoolingBufferedMutatorSubmission s = new SpoolingBufferedMutatorSubmission( + FLUSH, null, flushCount.incrementAndGet()); + try { + inbound.put(s); + if (synchronous) { + waitForFlusher(s); + } + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to flush.", e); + // Yield and preserve status + Thread.currentThread().interrupt(); + } + } + + /** + * Wait for the flush to be submitted (or timedout). + * @param s the flush or close submission + * @throws InterruptedException + */ + private void waitForFlusher(SpoolingBufferedMutatorSubmission s) + throws InterruptedException { + SpoolingBufferedMutatorFlusher flusher = + new SpoolingBufferedMutatorFlusher(s, outbound, coordinator, spooler); + // Wait for the close to complete before returning. + Future result = flushES.submit(flusher); + + // TODO: properly handle all exceptions and deal with timeouts. + try { + // Block until this flush is done. + result.get(); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getWriteBufferSize() + */ + @Override + public long getWriteBufferSize() { + return writeBufferSize; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#setRpcTimeout(int) + */ + @Override + public void setRpcTimeout(int timeout) { + // TODO: do not refer to wrapped, since it may not have been initialized. + // TODO: should this be synchronized? + wrapped.setOperationTimeout(timeout); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.BufferedMutator#setOperationTimeout(int) + */ + @Override + public void setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + // TODO: file a bug in HBase to clarify in the javadoc what timeunit this + // timeout is supposed to be. Milliseconds? + wrapped.setOperationTimeout(timeout); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java new file mode 100644 index 0000000..d92bf46 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java @@ -0,0 +1,228 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.MUTATE; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Used to track mutations, flush, and close actions on the + * {@link SpoolingBufferedMutatorImpl} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferedMutatorSubmission { + + private final SpoolingBufferedMutatorSubmissionType submissionType; + private final List mutations; + private final long flushCount; + /** + * The cumulative heap-size of the list of mutations. + */ + private final long size; + /** + * The number of mutations that this submission consists of. + */ + private final long count; + + private volatile long submitNanonTime = 0; + private volatile long completionNanonTime = 0; + + /** + * Used so that the spooler thread known whether this flush or close mutation + * has made it through submission (or timed out). + */ + private final CountDownLatch flushLatch; + + /** + * @param submissionType non-null type + * @param mutations Cannot be null for + * {@link SpoolingBufferedMutatorSubmissionType#MUTATE}, but is + * ignored for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} + * and {@link SpoolingBufferedMutatorSubmissionType#CLOSE} + * @param flushCount indicating which batch this submission is part of + */ + public SpoolingBufferedMutatorSubmission( + SpoolingBufferedMutatorSubmissionType submissionType, + List mutations, long flushCount) { + if (submissionType == null) { + throw new IllegalArgumentException("Submissiontype cannot be null."); + } + this.submissionType = submissionType; + + if (submissionType == MUTATE) { + if (mutations == null) { + throw new IllegalArgumentException("Cannot mutate nothing."); + } + this.mutations = mutations; + long toAddSize = 0; + int toAddCount = 0; + for (Mutation m : mutations) { + toAddSize += m.heapSize(); + ++toAddCount; + } + size = toAddSize; + count = toAddCount; + flushLatch = null; + } else { + this.mutations = null; + this.size = 0; + this.count = 0; + flushLatch = new CountDownLatch(1); + } + + this.flushCount = flushCount; + } + + public SpoolingBufferedMutatorSubmissionType submissionType() { + return submissionType; + } + + /** + * @return mutations. Cannot be null for + * {@link SpoolingBufferedMutatorSubmissionType#MUTATE}, but is null + * for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and + * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} + */ + public List mutations() { + return mutations; + } + + public long flushCount() { + return flushCount; + } + + /** + * @return the total heap size of all mutations. + */ + public long size() { + return size; + } + + /** + * @return the number of mutations. + */ + public long count() { + return count; + } + + /** + * Use the caller's thread to submit. Note that this can block. + * @param bm the buffered mutator on which to submit this. Cannot be null. + * @throws IOException if a remote or network exception occurs. + */ + public void submit(BufferedMutator bm) throws IOException { + submitNanonTime = System.nanoTime(); + completionNanonTime = 0; + switch (submissionType) { + case CLOSE: + bm.close(); + break; + case FLUSH: + bm.flush(); + break; + case MUTATE: + bm.mutate(mutations); + break; + default: + break; + } + completionNanonTime = System.nanoTime(); + } + + /** + * @return the submitNanonTime. Can be 0 if not yet submitted. + */ + public long getSubmitNanonTime() { + return submitNanonTime; + } + + /** + * @return the completionNanonTime. Can be 0 when not yet completed. + */ + public long getCompletionNanonTime() { + return completionNanonTime; + } + + /** + * To be called by the processor to indicate that this submission is ready to + * be spooled, whether that is actually spooling or dropping. + */ + public void readyToFlush() { + if (flushLatch != null) { + flushLatch.countDown(); + } + } + + /** + * Causes the current thread to wait until {@link #readyToFlush()} is called + * for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and + * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submissions, unless the + * thread is interrupted, or the specified waiting time elapses. + * + * For {@link SpoolingBufferedMutatorSubmissionType#MUTATE} then this method + * returns immediately with the value true. + * + * If {@link #readyToFlush()} has already been called then this method returns + * immediately with the value true. + * + * For {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and + * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submissions, if + * {@link #readyToFlush()} has not yet been called then the current thread + * becomes disabled for thread scheduling purposes and lies dormant until one + * of three things happen: + * + * The {@link #readyToFlush()} is called; or Some other thread interrupts the + * current thread; or The specified waiting time elapses. If + * {@link #readyToFlush()} is called then the method returns with the value + * true. + * + * If the current thread: + * + * has its interrupted status set on entry to this method; or is interrupted + * while waiting, then InterruptedException is thrown and the current thread's + * interrupted status is cleared. If the specified waiting time elapses then + * the value false is returned. If the time is less than or equal to zero, the + * method will not wait at all. + * + * Parameters: Throws: InterruptedException - if the current thread is + * interrupted while waiting + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return Returns: true for + * {@link SpoolingBufferedMutatorSubmissionType#MUTATE} and otherwise + * if {@link #readyToFlush()} has been called and false if the waiting + * time elapsed before the count reached zero + * @throws InterruptedException + */ + public boolean waitToFlush(long timeout, TimeUnit unit) + throws InterruptedException { + if (flushLatch == null) { + // Should happen only for mutations + return true; + } else { + return flushLatch.await(timeout, unit); + } + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java new file mode 100644 index 0000000..1c2b696 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Used to indicte what this particular submission is intended for. + */ +enum SpoolingBufferedMutatorSubmissionType { + + /** + * Indicate that a mutation is submitted. + */ + MUTATE, + /** + * Indicate that buffers should be flushed. + */ + FLUSH, + /** + * Indicate that mutators and output should be closed. + */ + CLOSE; + + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java new file mode 100644 index 0000000..3b25cea --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class is responsible for submitting mutations to the BufferedMutator + * that could potentially hang for a while. This allows the code that submits + * this Callable to make progress and determine to spool when the + * BufferedMutator takes too long. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferedMutatorSubmitter + implements Callable { + + private final BufferedMutator bm; + private final SpoolingBufferedMutatorSubmission submission; + private final SpoolingBufferedMutatorCoordinator coordinator; + + /** + * + * @param mutation to be submitted to the BufferedMutator + * @param bm where to submit mutations to + * @param coordinator + * @param flushCount Indicator of the flushCount that this mutation is part + * of. + */ + public SpoolingBufferedMutatorSubmitter( + SpoolingBufferedMutatorSubmission submission, BufferedMutator bm, + SpoolingBufferedMutatorCoordinator coordinator) { + this.submission = submission; + this.bm = bm; + this.coordinator = coordinator; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.Callable#call() + */ + @Override + public SpoolingBufferedMutatorSubmission call() throws Exception { + Exception cause = null; + try { + // Delegate this to the submission in order to track timing + submission.submit(bm); + } catch (Exception e) { + cause = e; + } + // Progress has to be reported from the submitter, because the coordinator + // might have moved on to another item, or perhaps it is blocked and waiting + // for a next submission to come in. We need to be able to recover from a + // bad state. + coordinator.report(submission, cause); + if (cause != null) { + throw cause; + } + return submission; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java new file mode 100644 index 0000000..45f4987 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java @@ -0,0 +1,166 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The processor is responsible to take items from the inbound queue, present + * them to the submitter, and enqueue them into the outbound queue. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferingMutatorProcessor implements Callable { + + private static final Log LOG = + LogFactory.getLog(SpoolingBufferingMutatorProcessor.class); + + final BlockingQueue inbound; + final BlockingQueue outbound; + + private final BufferedMutator bm; + private final ExecutorService es; + private final SpoolingBufferedMutatorCoordinator coordinator; + + private volatile boolean closed = false; + + /** + * @param inbound queue where submissions are entered on + * @param outbound where submissions are stashed until they can get spooled or + * dropped (once we can confirm that all mutations were successfully + * flushed). + * @param bm the buffered mutator on which the + * {@link SpoolingBufferedMutatorSubmitter} should submit items + * @param es used to submit {@link SpoolingBufferedMutatorSubmitter} tasks to + * talk to the wrapped BufferedMutator. The caller is to manage the + * lifecycle for this ExecutorService. + * + */ + public SpoolingBufferingMutatorProcessor( + BlockingQueue inbound, + BlockingQueue outbound, + BufferedMutator bm, ExecutorService es, + SpoolingBufferedMutatorCoordinator coordinator) { + // TODO: check arguments + this.inbound = inbound; + this.outbound = outbound; + this.bm = bm; + this.es = es; + this.coordinator = coordinator; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.Callable#call() + */ + @Override + public Void call() throws Exception { + + SpoolingBufferedMutatorSubmission submission = null; + // Time in nanoseconds for which to remain waiting for the future to + // complete + long timeout = 1; + Future future = null; + while (!closed) { + + submission = inbound.take(); + outbound.put(submission); + timeout = coordinator.getTimeout(submission); + + // Keep waiting until we're told to no longer wait. + // Not waiting when the future isn't done means we're moving submissions + // to the outbound queue without submitting them, in other words when + // they need to be spooled. + while (timeout > 0) { + + if (future == null) { + // Submit the next item. Submit a new item only if the previously + // submitted one was done. Otherwise, we're bypassing. + SpoolingBufferedMutatorSubmitter submitter = + new SpoolingBufferedMutatorSubmitter(submission, bm, coordinator); + try { + future = es.submit(submitter); + } catch (RejectedExecutionException ree) { + LOG.error("Cannot submit mutation submittion.", ree); + // TODO: deal with this. + } + } + + try { + future.get(timeout, TimeUnit.NANOSECONDS); + } catch (CancellationException ce) { + // We got interrupted, this means that we should yield + LOG.info( + "SpoolingBufferedMutatorProcessor got interrupted, yielding."); + closed = true; + Thread.interrupted(); + break; + } catch (ExecutionException ee) { + LOG.debug("SpoolingBufferedMutatorProcessor submission failed.", + ee.getCause()); + // TODO: determine what we do next + } catch (InterruptedException ie) { + // We got interrupted, this means that we should yield + LOG.info( + "SpoolingBufferedMutatorProcessor got interrupted, yielding."); + closed = true; + Thread.currentThread().interrupt(); + break; + } catch (TimeoutException te) { + timeout = coordinator.getTimeout(submission); + } + + // indicate that we're done with this submission. + if (future.isDone()) { + // Wipe out the future, so that we can submit the next item + future = null; + break; + } + + } // while waiting for the currentSubmission to finish + + if (submission.submissionType() == SpoolingBufferedMutatorSubmissionType.CLOSE) { + this.closed = true; + } + + // The submission is either finished, or timed out, we need to indicate it + // is time to move on + submission.readyToFlush(); + + } // while not closed + + // Call close again, it is possible for the last submission before close to + // timed out, and then close may not have been submitted. + bm.close(); + + return null; + } + +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockBufferedMutatorImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockBufferedMutatorImpl.java new file mode 100644 index 0000000..598c741 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockBufferedMutatorImpl.java @@ -0,0 +1,179 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; + +/** + * Mock BufferedMutator implementation used for testing purposes. + */ +class MockBufferedMutatorImpl implements BufferedMutator { + + private final TableName tableName; + private long writeBufferSize; + private int rpcTimeout = 0; + private int operationTimeout = 0; + + final BlockingQueue> mutated = + new LinkedBlockingQueue<>(); + private boolean closed; + private boolean flushed; + + /** + * + */ + public MockBufferedMutatorImpl(TableName tableName) { + this.tableName = tableName; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getName() + */ + public TableName getName() { + return tableName; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getConfiguration() + */ + public Configuration getConfiguration() { + // Return null, we're not interested in this method. + return null; + } + + /** + * returns all spooled mutations sofar, resets the queue, and resets the close + * and flushed attributes. + * @return the list of all mutation lists. + */ + synchronized List> getMutated() { + List> results = new LinkedList<>(); + mutated.drainTo(results); + flushed = false; + closed = false; + return results; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.BufferedMutator#mutate(org.apache.hadoop. + * hbase.client.Mutation) + */ + public synchronized void mutate(Mutation m) throws IOException { + try { + mutated.put(Collections.singletonList(m)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while spooling.", e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#mutate(java.util.List) + */ + public synchronized void mutate(List ms) + throws IOException { + try { + mutated.put(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while spooling.", e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#close() + */ + @Override + public synchronized void close() throws IOException { + this.closed = true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#flush() + */ + public synchronized void flush() throws IOException { + this.flushed = true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getWriteBufferSize() + */ + public long getWriteBufferSize() { + return writeBufferSize; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#setRpcTimeout(int) + */ + public void setRpcTimeout(int timeout) { + this.rpcTimeout = timeout; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.BufferedMutator#setOperationTimeout(int) + */ + public void setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + } + + /** + * @return whether {@link #flush()} was called since the last call to + * {@link #getMutated()} was called. + */ + public boolean flushed() { + return flushed; + } + + /** + * @return whether {@link #close()} was called since the last call to + * {@link #getMutated()} was called. + */ + public boolean closed() { + return closed; + } + +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockMutationSpooler.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockMutationSpooler.java new file mode 100644 index 0000000..3ea65a6 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/MockMutationSpooler.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Mock implementation that simply queues all mutations to a queue so that they + * can be later returned and inspected. + */ +class MockMutationSpooler implements MutationSpooler { + + final BlockingQueue> spooled = + new LinkedBlockingQueue<>(); + private boolean flushed = false; + private boolean closed = false; + + /** + * To be used for testing. + */ + MockMutationSpooler() { + } + + /** + * Clears the mutations and the flushed status. + * @return the list of all mutation lists. + */ + synchronized List> getSpooled() { + List> results = new LinkedList<>(); + spooled.drainTo(results); + spooled.clear(); + flushed = false; + closed = false; + return results; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.MutationSpooler#mutate(org.apache.hadoop. + * hbase.client.Mutation) + */ + public synchronized void mutate(Mutation m) throws IOException { + try { + spooled.put(Collections.singletonList(m)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while spooling.", e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(java.util.List) + */ + public synchronized void mutate(List ms) + throws IOException { + try { + spooled.put(ms); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while spooling.", e); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.MutationSpooler#flush() + */ + public synchronized void flush() throws IOException { + flushed = true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.MutationSpooler#close() + */ + @Override + public synchronized void close() throws IOException { + closed = true; + } + + /** + * @return whether this spooled has been flushed since the last call to + * {@link #getSpooled()} + */ + public boolean flushed() { + return flushed; + } + + /** + * @return whether this closed has been flushed since the last call to + * {@link #getSpooled()} + */ + public boolean closed() { + return closed; + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorCoordinator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorCoordinator.java new file mode 100644 index 0000000..4929ce0 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorCoordinator.java @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.CLOSE; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator.State; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestSpoolingBufferedMutatorCoordinator { + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Test + public void testTransitions() { + SpoolingBufferedMutatorCoordinator coordinator = + new SpoolingBufferedMutatorCoordinator(); + coordinator.setState(State.GOOD); + assertTrue(coordinator.getTimeout(null) > 0); + + coordinator.setState(State.BAD); + coordinator.setState(State.TRANSITIONING); + coordinator.setState(State.BAD); + coordinator.setState(State.TRANSITIONING); + coordinator.setState(State.GOOD); + + // Can't do GOOD->TRANSITIONING + exception.expect(IllegalStateException.class); + coordinator.setState(State.TRANSITIONING); + + coordinator.setState(State.GOOD); + coordinator.setState(State.BAD); + + // Can't do BAD->GOOD + exception.expect(IllegalStateException.class); + coordinator.setState(State.GOOD); + + coordinator.setState(State.BAD); + } + + @Test + public void testClose() { + // A close should never be 0, no matter what state + SpoolingBufferedMutatorCoordinator coordinator = + new SpoolingBufferedMutatorCoordinator(); + coordinator.setState(State.GOOD); + SpoolingBufferedMutatorSubmission closeSubmission = + new SpoolingBufferedMutatorSubmission(CLOSE, null, 0); + assertTrue(coordinator.getTimeout(closeSubmission) > 0); + + coordinator.setState(State.BAD); + assertTrue(coordinator.getTimeout(closeSubmission) > 0); + } + +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorImpl.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorImpl.java new file mode 100644 index 0000000..a8aaa9a --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestSpoolingBufferedMutatorImpl.java @@ -0,0 +1,103 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator.State; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * + */ +@Category({ ClientTests.class, MediumTests.class }) +public class TestSpoolingBufferedMutatorImpl { + + @Before + public void setUp() throws Exception { + // TODO: set up items before test + } + + @Test + public void test() throws IOException { + TableName tableName = TableName.valueOf("someTable"); + MockBufferedMutatorImpl wrapped = new MockBufferedMutatorImpl(tableName); + SpoolingBufferedMutatorCoordinator coordinator = + new SpoolingBufferedMutatorCoordinator(); + MockMutationSpooler spooler = new MockMutationSpooler(); + BufferedMutatorParams params = new BufferedMutatorParams(tableName); + BufferedMutator sbmi = + new SpoolingBufferedMutatorImpl(wrapped, coordinator, spooler, params); + byte[] rowkey = "rowKey".getBytes(); + byte[] family = "cf".getBytes(); + byte[] qualifier = "cq1".getBytes(); + byte[] value = "val1".getBytes(); + Put p = new Put(rowkey); + p.setAttribute("a1", "v1".getBytes()); + long timestamp = System.currentTimeMillis(); + p.addColumn(family, qualifier, timestamp, value); + sbmi.mutate(p); + + qualifier = "cq2".getBytes(); + value = "val2".getBytes(); + p.addColumn(family, qualifier, timestamp, value); + sbmi.mutate(p); + + assertFalse(wrapped.flushed()); + assertFalse(wrapped.closed()); + + sbmi.flush(); + + assertTrue(wrapped.flushed()); + assertFalse(wrapped.closed()); + + List> spooled = spooler.getSpooled(); + assertEquals(0, spooled.size()); + + List> mutated = wrapped.getMutated(); + assertEquals(2, mutated.size()); + + coordinator.setState(State.BAD); + + sbmi.mutate(p); + sbmi.flush(); + + mutated = wrapped.getMutated(); + assertEquals(0, mutated.size()); + + spooled = spooler.getSpooled(); + assertEquals(1, spooled.size()); + + sbmi.close(); + assertTrue(spooler.closed()); + assertTrue(wrapped.closed()); + + } + +}