diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index d4cdead..45152a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -38,14 +38,15 @@ public class BufferedMutatorParams { private long writeBufferSize = UNSET; private int maxKeyValueSize = UNSET; private ExecutorService pool = null; - private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { - @Override - public void onException(RetriesExhaustedWithDetailsException exception, - BufferedMutator bufferedMutator) - throws RetriesExhaustedWithDetailsException { - throw exception; - } - }; + private BufferedMutator.ExceptionListener listener = + new BufferedMutator.ExceptionListener() { + @Override + public void onException(RetriesExhaustedWithDetailsException exception, + BufferedMutator bufferedMutator) + throws RetriesExhaustedWithDetailsException { + throw exception; + } + }; public BufferedMutatorParams(TableName tableName) { this.tableName = tableName; @@ -60,9 +61,9 @@ public class BufferedMutatorParams { } /** - * Override the write buffer size specified by the provided {@link Connection}'s - * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key - * {@code hbase.client.write.buffer}. + * Override the write buffer size specified by the provided + * {@link Connection}'s {@link org.apache.hadoop.conf.Configuration} instance, + * via the configuration key {@code hbase.client.write.buffer}. */ public BufferedMutatorParams writeBufferSize(long writeBufferSize) { this.writeBufferSize = writeBufferSize; @@ -74,9 +75,9 @@ public class BufferedMutatorParams { } /** - * Override the maximum key-value size specified by the provided {@link Connection}'s - * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key - * {@code hbase.client.keyvalue.maxsize}. + * Override the maximum key-value size specified by the provided + * {@link Connection}'s {@link org.apache.hadoop.conf.Configuration} instance, + * via the configuration key {@code hbase.client.keyvalue.maxsize}. */ public BufferedMutatorParams maxKeyValueSize(int maxKeyValueSize) { this.maxKeyValueSize = maxKeyValueSize; @@ -88,8 +89,8 @@ public class BufferedMutatorParams { } /** - * Override the default executor pool defined by the {@code hbase.htable.threads.*} - * configuration values. + * Override the default executor pool defined by the + * {@code hbase.htable.threads.*} configuration values. */ public BufferedMutatorParams pool(ExecutorService pool) { this.pool = pool; @@ -101,10 +102,26 @@ public class BufferedMutatorParams { } /** - * Override the default error handler. Default handler simply rethrows the exception. + * Override the default error handler. Default handler simply rethrows the + * exception. */ - public BufferedMutatorParams listener(BufferedMutator.ExceptionListener listener) { + public BufferedMutatorParams listener( + BufferedMutator.ExceptionListener listener) { this.listener = listener; return this; } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#clone() + */ + public BufferedMutatorParams clone() { + BufferedMutatorParams clone = new BufferedMutatorParams(this.tableName); + clone.writeBufferSize = this.writeBufferSize; + clone.maxKeyValueSize = maxKeyValueSize; + clone.pool = this.pool; + clone.listener = this.listener; + return clone; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java new file mode 100644 index 0000000..9a0740a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FilesystemMutationSpooler.java @@ -0,0 +1,75 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class is responsible to spool mutations to the configured filesystem. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class FilesystemMutationSpooler implements MutationSpooler { + + /** + * + */ + public FilesystemMutationSpooler() { + // TODO Auto-generated constructor stub + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(org.apache.hadoop.hbase.client.Mutation) + */ + @Override + public void mutate(Mutation m) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#mutate(java.util.List) + */ + @Override + public void mutate(List ms) throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#flush() + */ + @Override + public void flush() throws IOException { + // TODO Auto-generated method stub + + } + + /* (non-Javadoc) + * @see org.apache.hadoop.hbase.client.MutationSpooler#close() + */ + @Override + public void close() throws IOException { + // TODO Auto-generated method stub + + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java new file mode 100644 index 0000000..6d41ba4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MutationSpooler.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Implementations are responsible to spool mutations. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface MutationSpooler { + + /** + * Spool a {@link Mutation} to an external sink. Currently only supports + * {@link Put} and {@link Delete} mutations. + * + * @param mutation The data to send. + * @throws IOException if a remote or network exception occurs. + */ + public void mutate(Mutation m) throws IOException; + + /** + * Spool a {@link Mutation}s to an external sink. Currently only supports + * {@link Put} and {@link Delete} mutations. + * + * @param mutation The data to send. + * @throws IOException if a remote or network exception occurs. + */ + public void mutate(List ms) throws IOException; + + /** + * Executes all the buffered, asynchronous {@link Mutation} operations and + * waits until they are done. + * + * @throws IOException if a remote or network exception occurs. + */ + public void flush() throws IOException; + + /** + * Performs a {@link #flush()} and releases any resources held. + * + * @throws IOException if a remote or network exception occurs. + */ + public void close() throws IOException; + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java new file mode 100644 index 0000000..3dffdb6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinator.java @@ -0,0 +1,59 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +interface SpoolingBufferedMutatorCoordinator { + + /** + * How much longer processing needs to wait for the current submission to be + * processed. A value of 0 indicates that HBase must be in a bad state and + * processing doesn't have to wait. Therefore, outbound items must be spooled, + * not dropped. Any value larger than 0 means that the processing must wait + * for completion for at least the specified amount of time or ask again. + * + * @param submission non-null submission. + * @return the time in {@link #getTimeUnit()} units to wait. + */ + long getTimeout(SpoolingBufferedMutatorSubmission submission); + + /** + * @return the time unit to use for waiting. + */ + TimeUnit getTimeUnit(); + + /** + * @param submission determines the flush count up to which we should spool or + * drop. + * @return whether outbound items in the batch represented by submission must + * be spooled or dropped. + */ + boolean shouldSpool(SpoolingBufferedMutatorSubmission submission); + + /** + * Report that a submission is done. + * @param result the result of the submission + * @param cause if any exceptions were thrown. + */ + void report(SpoolingBufferedMutatorSubmission result, Throwable cause); + +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinatorImpl.java new file mode 100644 index 0000000..21e0dcd --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorCoordinatorImpl.java @@ -0,0 +1,202 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class keeps track of HBase state, and determines how long to wait for a + * submission. + *

+ * Note that we can get extra information from an exception handler, and that + * can help us determine that HBase is bad and/or back up again. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SpoolingBufferedMutatorCoordinatorImpl + implements SpoolingBufferedMutatorCoordinator { + + private final TimeUnit TU = MILLISECONDS; + + /** + * TODO: determine exact value, for example 100 seconds. TODO: make this + * configurable + */ + private final long submitTimeoutMillis = 100000; // 100 seconds + + /** + * The timeout increment to wait, after which we're asked again how much + * additional time to wait. We will not get control back for up to this time. + *

+ * Note: should be strictly smaller than submitTimeoutMillis. + *

+ * TODO: make this configurable + */ + private final long timeoutIncrementMillis = 1000; // 1 second + + /** + * Capture the states that HBase can be in. + */ + private enum State { + /** + * HBase has some issues. Spool data, and don't wait for mutations to + * complete. Next states {TRANSITIONING} + */ + BAD, + /** + * HBase is in a good state, wait the regular time for mutations to + * complete. Do not spool. Next states {BAD} + */ + GOOD, + /** + * HBase connection was bad before. Still spool, because we don't know + * exactly which mutations went to HBase, but do wait for mutations to + * complete, so that we're not bypassing submitting to the BufferedMutator. + * Next states {BAD,GOOD} + */ + TRANSITIONING; + } + + private volatile State state = State.GOOD; + + /** + * TODO: determine if the caller managers the lifecycle, or we do. + * @param spoolingES Non, null executor service on which to schedule spooling. + */ + public SpoolingBufferedMutatorCoordinatorImpl() { + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator# + * getRemainingWaitTimeout(org.apache.hadoop.hbase.client. + * SpoolingBufferedMutatorSubmission) + */ + public synchronized long getTimeout( + SpoolingBufferedMutatorSubmission submission) { + + if (state == State.BAD) { + return 0; + } else { + // We're in GOOD or TRANSITIONING state. Wait for submission to go through + // the BM. + + // TODO: perhaps we want to wait longer for larger submissions. + long size = submission.size(); + long count = submission.count(); + + long submitTime = submission.getSubmitNanonTime(); + + if (submitTime <= 0) { + // The submission hasn't been added to the BufferedMutator yet. + return timeoutIncrementMillis; + } else { + long now = System.nanoTime(); + long waitTime = now - submitTime; + if (waitTime < submitTimeoutMillis) { + return timeoutIncrementMillis; + } else { + // We had to wait too long for this submission to go through, assume + // HBase is in bad shape and start spooling. + state = State.BAD; + return 0; + } + } + } + + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator# + * getTimeUnit() + */ + public TimeUnit getTimeUnit() { + return TU; + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator# + * shouldSpool(org.apache.hadoop.hbase.client. + * SpoolingBufferedMutatorSubmission) + */ + public boolean shouldSpool(SpoolingBufferedMutatorSubmission submission) { + // TODO: should we keep track of the last successful flush that went through + // and perhaps still drop some items from outbound, even if we just went + // into a bad state? + if (state == State.GOOD) { + return false; + } else { + // Spool for both BAD and TRANSITIONING states. + return true; + } + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.SpoolingBufferedMutatorCoordinator#report( + * org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmission, + * java.lang.Throwable) + */ + public void report(SpoolingBufferedMutatorSubmission result, + Throwable cause) { + // TODO: put intelligence here to clean info from the cause to determine if + // HBase is down, and how severe the problem is. + + switch (state) { + case BAD: + if (cause == null) { + state = State.TRANSITIONING; + } else { + // TODO: stay in bad state, perhaps capture the time and count of + // remaining in + // bad state. + } + break; + case GOOD: + if (cause == null) { + // TODO: remain in good state. Perhaps capture count and time in good + // state. + } else { + // TODO: are we going to transition to bad state upon one single error? + state = State.BAD; + } + break; + case TRANSITIONING: + if (cause == null) { + // TODO: determine when to transition back to good. Presumably we want 2 + // or more good flushes before we transition back to good. + state = State.GOOD; + } else { + state = State.BAD; + } + break; + } + + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java new file mode 100644 index 0000000..160a6f4 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorFlusher.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Used to Flush one batch of mutations from the outbound queue. Mutations can + * be spooled or dropped. + */ +public class SpoolingBufferedMutatorFlusher + implements Callable { + + private final BlockingQueue outbound; + private final SpoolingBufferedMutatorCoordinator coordinator; + + private final MutationSpooler spooler; + private final SpoolingBufferedMutatorSubmission submission; + + /** + * @param submission the {@link SpoolingBufferedMutatorSubmissionType#FLUSH} + * or {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submission + * to flush out of the outbound queue. We need to wait on the + * submission to be ready to spool. It may not have made it in the + * queue yet, and/or we may not yet have decided whether to drop or + * spool this batch of mutations. Cannot be a mutate submission. + * Cannot be null. + * @param outbound the queue from which to flush submissions. + * @param coordinator that helps determine whether itesms should be spooled or + * dropped. + * @param spooler used to flush, if the coordinator indicates so. + */ + public SpoolingBufferedMutatorFlusher( + SpoolingBufferedMutatorSubmission submission, + BlockingQueue outbound, + SpoolingBufferedMutatorCoordinator coordinator, MutationSpooler spooler) { + // TODO: validate inputs + this.submission = submission; + this.outbound = outbound; + this.coordinator = coordinator; + this.spooler = spooler; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.Callable#call() + */ + public SpoolingBufferedMutatorSubmission call() throws Exception { + + // TODO: configure the wait time for the flush to go through and decide what + // to do if we time out. + long timeout = Long.MAX_VALUE; + TimeUnit tu = TimeUnit.SECONDS; + + // We need to wait because the BufferedMutatorImpl has enqueued the flush, + // but we don't yet know if it has been processed and whether it was + // successfully submitted. + submission.waitToFlush(timeout, tu); + + // Flush items up to this flushCount, but none later. Items could have been + // enqueued into inbound (and therefore outbound) out of order. + long maxFlushCount = submission.flushCount(); + + // Coordinator determines what to do. + boolean shouldSpool = coordinator.shouldSpool(submission); + + // We cannot quite drain the entire queue because a) out of order items + // might be in there, and b) items could be added after we started + // iterating. + int outboundSize = outbound.size(); + + SpoolingBufferedMutatorSubmission s; + for (int i = outboundSize; i > 0; i--) { + // TODO: should we use poll and possibly time out? What to do if we do + // time out? + s = outbound.take(); + if (s.flushCount() > maxFlushCount) { + // Put it back for later processing. + outbound.put(s); + } else { + // TODO: should we always close, no matter whether we should flush or + // not? + if (shouldSpool) { + switch (s.submissionType()) { + case CLOSE: + spooler.close(); + break; + case FLUSH: + spooler.flush(); + break; + case MUTATE: + spooler.mutate(s.mutations()); + break; + default: + // TODO: log some sort of error message. + break; + } + } else { + // Do no spool, just drop the record on the ground. + } + // TODO: the flusher should probably not drop any flushes other than the + // its "own" flush. + } + } // for loop over outbound queue + + // TODO: perhaps add some debugging with counts here. + + // Return so that the caller (SpoolingBufferedMutatorImpl) knows this flush + // or close have been completed.. + return submission; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java new file mode 100644 index 0000000..d3b2552 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorImpl.java @@ -0,0 +1,352 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.CLOSE; +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.FLUSH; +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.MUTATE; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * + */ +/** + * @author joep + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class SpoolingBufferedMutatorImpl implements BufferedMutator { + + private static final Log LOG = + LogFactory.getLog(SpoolingBufferedMutatorImpl.class); + + /** + * Queue to hand off to submission. + */ + @VisibleForTesting + final BlockingQueue inbound = + new LinkedBlockingQueue<>(); + + /** + * Queue to track submissions that need to possibly be spooled. + */ + @VisibleForTesting + final BlockingQueue outbound = + new LinkedBlockingQueue<>(); + + private final BufferedMutator wrapped; + private volatile boolean closed = false; + + /** + * Track the number of flushes we see, so that a batch of submissions are + * marked with the same flushCount. Note that multiple threads can mutate, so + * mutations of different flushCounts could be queued out of order. + */ + private final AtomicLong flushCount = new AtomicLong(0); + + private final SpoolingBufferedMutatorCoordinator coordinator; + + /** + * Only one single process will be running here (at a time). The executor + * service will re-spawn a thread if it crashes. + */ + ExecutorService processES = Executors.newSingleThreadExecutor(); + + /** + * Used to push submissions into the wrapped BufferedMutator. + */ + ExecutorService submitES = Executors.newSingleThreadExecutor(); + + /** + * Used to run flush requests, one per flush. + */ + ExecutorService flushES = Executors.newSingleThreadExecutor(); + + /** + * Hang on to this, in order to be able to cancel. + */ + private final Future processResult; + + private final MutationSpooler spooler; + + /** + * + * @param bm the buffered mutator that will be delegated to. + * @param c + */ + SpoolingBufferedMutatorImpl(BufferedMutator bm, + SpoolingBufferedMutatorCoordinator c, MutationSpooler spooler) { + if (bm == null) { + throw new IllegalArgumentException( + "Wrapped BufferedMutator cannot be null."); + } + this.wrapped = bm; + if (c == null) { + this.coordinator = new SpoolingBufferedMutatorCoordinatorImpl(); + } else { + this.coordinator = c; + } + + this.spooler = spooler; + + SpoolingBufferingMutatorProcessor processor = + new SpoolingBufferingMutatorProcessor(inbound, outbound, wrapped, + submitES, coordinator); + + // TODO: determine if we should create executor services, or let these be + // managed in the underlying structures. + + processResult = processES.submit(processor); + } + + /** + * Requests a BufferedMutator from the passed connection and wraps it. + * + * @param conn + * @param rpcCallerFactory ignored. + * @param rpcFactory ignored + * @param params used to pass to connection, minus the class name that causes + * it to create this SpoolingBufferedMutator + * @throws IOException + */ + SpoolingBufferedMutatorImpl(ClusterConnection conn, + RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcFactory, BufferedMutatorParams params) + throws IOException { + + BufferedMutatorParams clonedParams = null; + if (params != null) { + clonedParams = params.clone(); + } + // Wipe out the reference to this class to avoid an infinite loop + // TODO: uncomment the following line once patch for HBASE-17277 goes in. + // clonedParams.implementationClassName(null); + + // TODO: is it possible to have the BufferedMutator creation to hang / fail + // and therefore end up in a bad state to begin with? If so, we should + // change the signature and let the Processor or Submitter create the + // connection, perhaps as a separate SubmissionType. That way we won't stall + // if we want to connect to an HBase cluster that is already down, and we'd + // start spooling right away. + // TODO: Similarly, we probably want to ensure that we can connect (?) to + // the spooling filesystem. + this.wrapped = conn.getBufferedMutator(clonedParams); + this.coordinator = new SpoolingBufferedMutatorCoordinatorImpl(); + + // TODO: read some config variable to make this pluggable. + spooler = new FilesystemMutationSpooler(); + + SpoolingBufferingMutatorProcessor processor = + new SpoolingBufferingMutatorProcessor(inbound, outbound, wrapped, + submitES, coordinator); + + // TODO: determine if we should create executor services, or let these be + // managed in the underlying structures. + + processResult = processES.submit(processor); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getName() + */ + @Override + public TableName getName() { + return wrapped.getName(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getConfiguration() + */ + @Override + public Configuration getConfiguration() { + return wrapped.getConfiguration(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.BufferedMutator#mutate(org.apache.hadoop. + * hbase.client.Mutation) + */ + @Override + public void mutate(Mutation m) throws IOException { + mutate(Collections.singletonList(m)); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#mutate(java.util.List) + */ + @Override + public void mutate(List ms) throws IOException { + if (closed) { + throw new IllegalStateException( + "Cannot put when the BufferedMutator is closed."); + } + + if (ms == null) { + return; + } + + SpoolingBufferedMutatorSubmission s = + new SpoolingBufferedMutatorSubmission(MUTATE, ms, flushCount.get()); + try { + inbound.put(s); + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to mutate.", e); + // Yield and preserve status + Thread.interrupted(); + } finally { + this.closed = true; + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#close() + */ + @Override + public synchronized void close() throws IOException { + + if (closed) { + return; + } + SpoolingBufferedMutatorSubmission s = + new SpoolingBufferedMutatorSubmission(CLOSE, null, flushCount.get()); + try { + inbound.put(s); + + waitForFlusher(s); + + // TODO: do we need to do anything with processResult ? + + // TODO: ensure that all executors are wrapped up. + + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to close.", e); + // Yield and preserve status + Thread.interrupted(); + } finally { + this.closed = true; + } + } + + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#flush() + */ + @Override + public synchronized void flush() throws IOException { + if (closed) { + throw new IllegalStateException( + "Cannot flush when the BufferedMutator is closed."); + } + SpoolingBufferedMutatorSubmission s = new SpoolingBufferedMutatorSubmission( + FLUSH, null, flushCount.incrementAndGet()); + try { + inbound.put(s); + waitForFlusher(s); + } catch (InterruptedException e) { + LOG.error("Interrupted while trying to flush.", e); + // Yield and preserve status + Thread.interrupted(); + } + } + + /** + * Wait for the flush to be submitted (or timedout). + * @param s the flush or close submission + * @throws InterruptedException + */ + private void waitForFlusher(SpoolingBufferedMutatorSubmission s) + throws InterruptedException { + SpoolingBufferedMutatorFlusher flusher = + new SpoolingBufferedMutatorFlusher(s, outbound, coordinator, spooler); + // Wait for the close to complete before returning. + Future result = flushES.submit(flusher); + + // TODO: properly handle all exceptions and deal with timeouts. + try { + // Block until this flush is done. + result.get(); + } catch (ExecutionException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#getWriteBufferSize() + */ + @Override + public long getWriteBufferSize() { + return wrapped.getWriteBufferSize(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.hadoop.hbase.client.BufferedMutator#setRpcTimeout(int) + */ + @Override + public void setRpcTimeout(int timeout) { + wrapped.setOperationTimeout(timeout); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hbase.client.BufferedMutator#setOperationTimeout(int) + */ + @Override + public void setOperationTimeout(int timeout) { + wrapped.setOperationTimeout(timeout); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java new file mode 100644 index 0000000..efdee62 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmission.java @@ -0,0 +1,230 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.SpoolingBufferedMutatorSubmissionType.MUTATE; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Used to track mutations, flush, and close actions on the + * {@link SpoolingBufferedMutatorImpl} + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferedMutatorSubmission { + + private final SpoolingBufferedMutatorSubmissionType submissionType; + private final List mutations; + private final long flushCount; + /** + * The cumulative heap-size of the list of mutations. + */ + private final long size; + /** + * The number of mutations that this submission consists of. + */ + private final long count; + + private volatile long submitNanonTime = 0; + private volatile long completionNanonTime = 0; + + /** + * Used so that the spooler thread known whether this flush or close mutation + * has made it through submission (or timed out). + */ + private final CountDownLatch flushLatch; + + /** + * @param submissionType non-null type + * @param mutations Cannot be null for + * {@link SpoolingBufferedMutatorSubmissionType#MUTATE}, but is + * ignored for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} + * and {@link SpoolingBufferedMutatorSubmissionType#CLOSE} + * @param flushCount indicating which batch this submission is part of + */ + public SpoolingBufferedMutatorSubmission( + SpoolingBufferedMutatorSubmissionType submissionType, + List mutations, long flushCount) { + if (submissionType == null) { + throw new IllegalArgumentException("Submissiontype cannot be null."); + } + this.submissionType = submissionType; + + if (submissionType == MUTATE) { + if (mutations == null) { + throw new IllegalArgumentException("Cannot mutate nothing."); + } + this.mutations = mutations; + long toAddSize = 0; + int toAddCount = 0; + for (Mutation m : mutations) { + toAddSize += m.heapSize(); + ++toAddCount; + } + size = toAddSize; + count = toAddCount; + flushLatch = null; + } else { + this.mutations = null; + this.size = 0; + this.count = 0; + flushLatch = new CountDownLatch(1); + } + + this.flushCount = flushCount; + } + + public SpoolingBufferedMutatorSubmissionType submissionType() { + return submissionType; + } + + /** + * @return mutations. Cannot be null for + * {@link SpoolingBufferedMutatorSubmissionType#MUTATE}, but is null + * for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and + * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} + */ + public List mutations() { + return mutations; + } + + public long flushCount() { + return flushCount; + } + + /** + * @return the total heap size of all mutations. + */ + public long size() { + return size; + } + + /** + * @return the number of mutations. + */ + public long count() { + return count; + } + + /** + * Use the caller's thread to submit. Note that this can block. + * @param bm the buffered mutator on which to submit this. Cannot be null. + * @throws IOException if a remote or network exception occurs. + */ + public void submit(BufferedMutator bm) throws IOException { + submitNanonTime = System.nanoTime(); + completionNanonTime = 0; + switch (submissionType) { + case CLOSE: + bm.close(); + break; + case FLUSH: + bm.flush(); + break; + + case MUTATE: + bm.mutate(mutations); + break; + + default: + break; + } + completionNanonTime = System.nanoTime(); + } + + /** + * @return the submitNanonTime. Can be 0 if not yet submitted. + */ + public long getSubmitNanonTime() { + return submitNanonTime; + } + + /** + * @return the completionNanonTime. Can be 0 when not yet completed. + */ + public long getCompletionNanonTime() { + return completionNanonTime; + } + + /** + * To be called by the processor to indicate that this submission is ready to + * be spooled, whether that is actually spooling or dropping. + */ + public void readyToFlush() { + if (flushLatch != null) { + flushLatch.countDown(); + } + } + + /** + * Causes the current thread to wait until {@link #readyToFlush()} is called + * for {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and + * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submissions, unless the + * thread is interrupted, or the specified waiting time elapses. + * + * For {@link SpoolingBufferedMutatorSubmissionType#MUTATE} then this method + * returns immediately with the value true. + * + * If {@link #readyToFlush()} has already been called then this method returns + * immediately with the value true. + * + * For {@link SpoolingBufferedMutatorSubmissionType#FLUSH} and + * {@link SpoolingBufferedMutatorSubmissionType#CLOSE} submissions, if + * {@link #readyToFlush()} has not yet been called then the current thread + * becomes disabled for thread scheduling purposes and lies dormant until one + * of three things happen: + * + * The {@link #readyToFlush()} is called; or Some other thread interrupts the + * current thread; or The specified waiting time elapses. If + * {@link #readyToFlush()} is called then the method returns with the value + * true. + * + * If the current thread: + * + * has its interrupted status set on entry to this method; or is interrupted + * while waiting, then InterruptedException is thrown and the current thread's + * interrupted status is cleared. If the specified waiting time elapses then + * the value false is returned. If the time is less than or equal to zero, the + * method will not wait at all. + * + * Parameters: Throws: InterruptedException - if the current thread is + * interrupted while waiting + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @return Returns: true for + * {@link SpoolingBufferedMutatorSubmissionType#MUTATE} and otherwise + * if {@link #readyToFlush()} has been called and false if the waiting + * time elapsed before the count reached zero + * @throws InterruptedException + */ + public boolean waitToFlush(long timeout, TimeUnit unit) + throws InterruptedException { + if (flushLatch == null) { + // Should happen only for mutations + return true; + } else { + return flushLatch.await(timeout, unit); + } + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java new file mode 100644 index 0000000..1c2b696 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmissionType.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Used to indicte what this particular submission is intended for. + */ +enum SpoolingBufferedMutatorSubmissionType { + + /** + * Indicate that a mutation is submitted. + */ + MUTATE, + /** + * Indicate that buffers should be flushed. + */ + FLUSH, + /** + * Indicate that mutators and output should be closed. + */ + CLOSE; + + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java new file mode 100644 index 0000000..e0ecb02 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferedMutatorSubmitter.java @@ -0,0 +1,62 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class is responsible for submitting mutations to the BufferedMutator + * that could potentially hang for a while. This allows the code that submits + * this Callable to make progress and determine to spool when the + * BufferedMutator takes too long. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferedMutatorSubmitter + implements Callable { + + private final BufferedMutator bm; + private final SpoolingBufferedMutatorSubmission submission; + + /** + * + * @param mutation to be submitted to the BufferedMutator + * @param bm where to submit mutations to + * @param flushCount Indicator of the flushCount that this mutation is part + * of. + */ + public SpoolingBufferedMutatorSubmitter( + SpoolingBufferedMutatorSubmission submission, BufferedMutator bm) { + this.submission = submission; + this.bm = bm; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.Callable#call() + */ + @Override + public SpoolingBufferedMutatorSubmission call() throws Exception { + // Delegate this to the submission in order to track timing + submission.submit(bm); + return submission; + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java new file mode 100644 index 0000000..3249c8f --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SpoolingBufferingMutatorProcessor.java @@ -0,0 +1,160 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * The processor is responsible to take items from the inbound queue, present + * them to the submitter, and enqueue them into the outbound queue. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class SpoolingBufferingMutatorProcessor implements Callable { + + private static final Log LOG = + LogFactory.getLog(SpoolingBufferingMutatorProcessor.class); + + final BlockingQueue inbound; + final BlockingQueue outbound; + + private final BufferedMutator bm; + private final ExecutorService es; + private final SpoolingBufferedMutatorCoordinator coordinator; + + private volatile boolean stopped = false; + + /** + * @param inbound queue where submissions are entered on + * @param outbound where submissions are stashed until they can get spooled or + * dropped (once we can confirm that all mutations were successfully + * flushed). + * @param bm the buffered mutator on which the + * {@link SpoolingBufferedMutatorSubmitter} should submit items + * @param es used to submit {@link SpoolingBufferedMutatorSubmitter} tasks to + * talk to the wrapped BufferedMutator. The caller is to manage the + * lifecycle for this ExecutorService. + * + */ + public SpoolingBufferingMutatorProcessor( + BlockingQueue inbound, + BlockingQueue outbound, + BufferedMutator bm, ExecutorService es, + SpoolingBufferedMutatorCoordinator coordinator) { + // TODO: check arguments + this.inbound = inbound; + this.outbound = outbound; + this.bm = bm; + this.es = es; + this.coordinator = coordinator; + } + + /* + * (non-Javadoc) + * + * @see java.util.concurrent.Callable#call() + */ + @Override + public Void call() throws Exception { + + SpoolingBufferedMutatorSubmission currentSubmission = null; + // Time in nanoseconds for which to remain waiting for the future to + // complete + long timeout = 1; + Future future = null; + while (!stopped) { + SpoolingBufferedMutatorSubmission newSubmission = inbound.take(); + + outbound.put(newSubmission); + + // This indicates that we're ready to submit the next item. + if (currentSubmission == null) { + currentSubmission = newSubmission; + timeout = coordinator.getTimeout(currentSubmission); + // Submit the next item + SpoolingBufferedMutatorSubmitter submitter = + new SpoolingBufferedMutatorSubmitter(newSubmission, bm); + try { + future = es.submit(submitter); + } catch (RejectedExecutionException ree) { + LOG.error("Cannot submit mutation submittion.", ree); + // TODO: deal with this. + } + } + + // Keep waiting until we're told to no longer wait. + // Not waiting when the future isn't done means we're moving submissions + // to the outbound queue without submitting them, in other words when + // they need to be spooled. + while (timeout > 0) { + + Throwable cause = null; + SpoolingBufferedMutatorSubmission result = null; + try { + result = future.get(timeout, TimeUnit.NANOSECONDS); + } catch (CancellationException ce) { + // We got interrupted, this means that we should yield + LOG.info( + "SpoolingBufferedMutatorProcessor got interrupted, yielding."); + stopped = true; + Thread.interrupted(); + break; + } catch (ExecutionException ee) { + LOG.debug("SpoolingBufferedMutatorProcessor submission failed.", + ee.getCause()); + cause = ee.getCause(); + } catch (InterruptedException ie) { + // We got interrupted, this means that we should yield + LOG.info( + "SpoolingBufferedMutatorProcessor got interrupted, yielding."); + stopped = true; + Thread.interrupted(); + break; + } catch (TimeoutException te) { + timeout = coordinator.getTimeout(currentSubmission); + } + + // indicate that we're done with this submission. + if (future.isDone()) { + + coordinator.report(result, cause); + // TODO: report to the coordinator that this submission is done. + currentSubmission = null; + break; + } + + } // while waiting for the currentSubmission to finish + + } // while not stopped + // The only way to get here is when we need to shut down. + bm.close(); + return null; + } + +}