diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java new file mode 100644 index 0000000..9ee63ff --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRowProcessorEndpoint.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RowProcessor; + +/** + * This class demonstrates how to implement atomic read-modify-writes + * using {@link HRegion#processRowsWithLocks()} and Coprocessor endpoints. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class BaseRowProcessorEndpoint extends BaseEndpointCoprocessor + implements RowProcessorProtocol { + + /** + * Pass a processor to HRegion to process multiple rows atomically. + * + * The RowProcessor implementations should be the inner classes of your + * RowProcessorEndpoint. This way the RowProcessor can be class-loaded with + * the Coprocessor endpoint together. + * + * See {@link TestRowProcessorEndpoint} for example. + * + * @param processor The object defines the read-modify-write procedure + * @return The processing result + */ + @Override + public T process(RowProcessor processor) + throws IOException { + HRegion region = + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); + region.processRowsWithLocks(processor); + return processor.getResult(); + } + +} diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java deleted file mode 100644 index 75c7768..0000000 --- src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessor.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.coprocessor; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.io.Writable; - -/** - * Defines the procedure to atomically perform multiple scans and mutations - * on one single row. The generic type parameter T is the return type of - * RowProcessor.getResult(). - */ -@InterfaceAudience.Public -public interface RowProcessor extends Writable { - - /** - * Which row to perform the read-write - */ - byte[] getRow(); - - /** - * Obtain the processing result - */ - T getResult(); - - /** - * Is this operation read only? If this is true, process() should not add - * any mutations or it throws IOException. - * @return ture if read only operation - */ - boolean readOnly(); - - /** - * HRegion calls this to process a row. You should override this to create - * your own RowProcessor. - * - * @param now the current system millisecond - * @param scanner the call back object the can be used to scan the row - * @param mutations the mutations for HRegion to do - * @param walEdit the wal edit here allows inject some other meta data - */ - void process(long now, - RowProcessor.RowScanner scanner, - List mutations, - WALEdit walEdit) throws IOException; - - /** - * The call back provided by HRegion to perform the scans on the row - */ - public interface RowScanner { - /** - * @param scan The object defines what to read - * @param result The scan results will be added here - */ - void doScan(Scan scan, List result) throws IOException; - } - - /** - * @return The replication cluster id. - */ - UUID getClusterId(); - -} diff --git src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java new file mode 100644 index 0000000..c670c39 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/coprocessor/RowProcessorProtocol.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RowProcessor; + +/** + * Defines a protocol to perform multi row transactions. + * See {@link BaseRowProcessorEndpoint} for the implementation. + * See {@link HRegion#processRowsWithLocks()} for detials. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface RowProcessorProtocol extends CoprocessorProtocol { + + /** + * @param processor The processor defines how to process the row + */ + T process(RowProcessor processor) throws IOException; +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java new file mode 100644 index 0000000..c407f98 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/BaseRowProcessor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.UUID; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Base class for RowProcessor with some default implementations. + */ +public abstract class BaseRowProcessor implements RowProcessor { + + @Override + public T getResult() { + return null; + } + + @Override + public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + } + + @Override + public void postProcess(HRegion region, WALEdit walEdit) throws IOException { + } + + @Override + public UUID getClusterId() { + return HConstants.DEFAULT_CLUSTER_ID; + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 4efdc6b..e0d09df 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; @@ -95,7 +97,6 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.coprocessor.Exec; import org.apache.hadoop.hbase.client.coprocessor.ExecResult; -import org.apache.hadoop.hbase.coprocessor.RowProcessor; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; @@ -232,7 +233,11 @@ public class HRegion implements HeapSize { // , Writable{ final Configuration conf; final int rowLockWaitDuration; static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; - static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 10 * 1000L; + + // negative number indicates infinite timeout + static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L; + final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool(); + final HRegionInfo regionInfo; final Path regiondir; KeyValue.KVComparator comparator; @@ -486,6 +491,10 @@ public class HRegion implements HeapSize { // , Writable{ "hbase.hregion.keyvalue.timestamp.slop.millisecs", HConstants.LATEST_TIMESTAMP); + /** + * Timeout for the process time in processRowsWithLocks(). + * Use -1 to switch off time bound. + */ this.rowProcessorTimeout = conf.getLong( "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT); @@ -1676,7 +1685,7 @@ public class HRegion implements HeapSize { // , Writable{ /* * @param delete The passed delete is modified by this method. WARNING! */ - private void prepareDelete(Delete delete) throws IOException { + void prepareDelete(Delete delete) throws IOException { // Check to see if this is a deleteRow insert if(delete.getFamilyMap().isEmpty()){ for(byte [] family : this.htableDescriptor.getFamiliesKeys()){ @@ -1748,7 +1757,7 @@ public class HRegion implements HeapSize { // , Writable{ * @param now * @throws IOException */ - private void prepareDeleteTimestamps(Delete delete, byte[] byteNow) + void prepareDeleteTimestamps(Delete delete, byte[] byteNow) throws IOException { Map> familyMap = delete.getFamilyMap(); for (Map.Entry> e : familyMap.entrySet()) { @@ -2367,7 +2376,7 @@ public class HRegion implements HeapSize { // , Writable{ * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP} * with the provided current timestamp. */ - private void updateKVTimestamps( + void updateKVTimestamps( final Iterable> keyLists, final byte[] now) { for (List keys: keyLists) { if (keys == null) continue; @@ -2591,7 +2600,7 @@ public class HRegion implements HeapSize { // , Writable{ * Check the collection of families for validity. * @throws NoSuchColumnFamilyException if a family does not exist. */ - private void checkFamilies(Collection families) + void checkFamilies(Collection families) throws NoSuchColumnFamilyException { for (byte[] family : families) { checkFamily(family); @@ -2601,7 +2610,7 @@ public class HRegion implements HeapSize { // , Writable{ checkTimestamps(p.getFamilyMap(), now); } - private void checkTimestamps(final Map> familyMap, + void checkTimestamps(final Map> familyMap, long now) throws DoNotRetryIOException { if (timestampSlop == HConstants.LATEST_TIMESTAMP) { return; @@ -4232,243 +4241,123 @@ public class HRegion implements HeapSize { // , Writable{ */ public void mutateRowsWithLocks(Collection mutations, Collection rowsToLock) throws IOException { - boolean flush = false; - - startRegionOperation(); - List acquiredLocks = null; - try { - // 1. run all pre-hooks before the atomic operation - // if any pre hook indicates "bypass", bypass the entire operation - - // one WALEdit is used for all edits. - WALEdit walEdit = new WALEdit(); - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { - // by pass everything - return; - } - } else if (m instanceof Delete) { - Delete d = (Delete) m; - prepareDelete(d); - if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) { - // by pass everything - return; - } - } - } - } - - long txid = 0; - boolean walSyncSuccessful = false; - boolean locked = false; - - // 2. acquire the row lock(s) - acquiredLocks = new ArrayList(rowsToLock.size()); - for (byte[] row : rowsToLock) { - // attempt to lock all involved rows, fail if one lock times out - Integer lid = getLock(null, row, true); - if (lid == null) { - throw new IOException("Failed to acquire lock on " - + Bytes.toStringBinary(row)); - } - acquiredLocks.add(lid); - } - - // 3. acquire the region lock - this.updatesLock.readLock().lock(); - locked = true; - - // 4. Get a mvcc write number - MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert(); - - long now = EnvironmentEdgeManager.currentTimeMillis(); - byte[] byteNow = Bytes.toBytes(now); - try { - // 5. Check mutations and apply edits to a single WALEdit - for (Mutation m : mutations) { - if (m instanceof Put) { - Map> familyMap = m.getFamilyMap(); - checkFamilies(familyMap.keySet()); - checkTimestamps(familyMap, now); - updateKVTimestamps(familyMap.values(), byteNow); - } else if (m instanceof Delete) { - Delete d = (Delete) m; - prepareDelete(d); - prepareDeleteTimestamps(d, byteNow); - } else { - throw new DoNotRetryIOException( - "Action must be Put or Delete. But was: " - + m.getClass().getName()); - } - if (m.getWriteToWAL()) { - addFamilyMapToWALEdit(m.getFamilyMap(), walEdit); - } - } - - // 6. append all edits at once (don't sync) - if (walEdit.size() > 0) { - txid = this.log.appendNoSync(regionInfo, - this.htableDescriptor.getName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor); - } - - // 7. apply to memstore - long addedSize = 0; - for (Mutation m : mutations) { - addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w); - } - flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize)); - - // 8. release region and row lock(s) - this.updatesLock.readLock().unlock(); - locked = false; - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - acquiredLocks = null; - } - - // 9. sync WAL if required - if (walEdit.size() > 0 && - (this.regionInfo.isMetaRegion() || - !this.htableDescriptor.isDeferredLogFlush())) { - this.log.sync(txid); - } - walSyncSuccessful = true; - - // 10. advance mvcc - mvcc.completeMemstoreInsert(w); - w = null; - - // 11. run coprocessor post host hooks - // after the WAL is sync'ed and all locks are released - // (similar to doMiniBatchPut) - if (coprocessorHost != null) { - for (Mutation m : mutations) { - if (m instanceof Put) { - coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); - } else if (m instanceof Delete) { - coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); - } - } - } - } finally { - // 12. clean up if needed - if (!walSyncSuccessful) { - int kvsRolledback = 0; - for (Mutation m : mutations) { - for (Map.Entry> e : m.getFamilyMap() - .entrySet()) { - List kvs = e.getValue(); - byte[] family = e.getKey(); - Store store = getStore(family); - // roll back each kv - for (KeyValue kv : kvs) { - store.rollback(kv); - kvsRolledback++; - } - } - } - LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback - + " KeyValues"); - } - - if (w != null) { - mvcc.completeMemstoreInsert(w); - } - if (locked) { - this.updatesLock.readLock().unlock(); - } + MultiRowMutationProcessor proc = + new MultiRowMutationProcessor(mutations, rowsToLock); + processRowsWithLocks(proc, -1); + } - if (acquiredLocks != null) { - for (Integer lid : acquiredLocks) { - releaseRowLock(lid); - } - } - } - } finally { - if (flush) { - // 13. Flush cache if needed. Do it outside update lock. - requestFlush(); - } - closeRegionOperation(); - } + /** + * Performs atomic multiple reads and writes on a given row. + * + * @param processor The object defines the reads and writes to a row. + */ + public void processRowsWithLocks(RowProcessor processor) + throws IOException { + processRowsWithLocks(processor, rowProcessorTimeout); } /** * Performs atomic multiple reads and writes on a given row. + * * @param processor The object defines the reads and writes to a row. + * @param timeout The timeout of the processor.process() execution + * Use a negative number to switch off the time bound */ - public void processRow(RowProcessor processor) + public void processRowsWithLocks(RowProcessor processor, long timeout) throws IOException { - byte[] row = processor.getRow(); - checkRow(row, "processRow"); + + for (byte[] row : processor.getRowsToLock()) { + checkRow(row, "processRowsWithLocks"); + } if (!processor.readOnly()) { checkReadOnly(); } checkResources(); - MultiVersionConsistencyControl.WriteEntry writeEntry = null; - startRegionOperation(); + WALEdit walEdit = new WALEdit(); + // 1. Run pre-process hook + processor.preProcess(this, walEdit); + + // Short circuit the read only case + if (processor.readOnly()) { + try { + long now = EnvironmentEdgeManager.currentTimeMillis(); + doProcessRowWithTimeout( + processor, now, this, null, null, timeout); + processor.postProcess(this, walEdit); + } finally { + closeRegionOperation(); + } + return; + } + + MultiVersionConsistencyControl.WriteEntry writeEntry = null; boolean locked = false; boolean walSyncSuccessful = false; - Integer rowLockID = null; + List acquiredLocks = null; long addedSize = 0; List mutations = new ArrayList(); + Collection rowsToLock = processor.getRowsToLock(); try { - // 1. Row lock - rowLockID = getLock(null, row, true); - - // 2. Region lock + // 2. Acquire the row lock(s) + acquiredLocks = new ArrayList(rowsToLock.size()); + for (byte[] row : rowsToLock) { + // Attempt to lock all involved rows, fail if one lock times out + Integer lid = getLock(null, row, true); + if (lid == null) { + throw new IOException("Failed to acquire lock on " + + Bytes.toStringBinary(row)); + } + acquiredLocks.add(lid); + } + // 3. Region lock this.updatesLock.readLock().lock(); locked = true; + long now = EnvironmentEdgeManager.currentTimeMillis(); try { - // 3. Let the processor scan the row and generate mutations - WALEdit walEdits = new WALEdit(); - doProcessRowWithTimeout(processor, now, rowScanner, mutations, - walEdits, rowProcessorTimeout); - if (processor.readOnly() && !mutations.isEmpty()) { - throw new IOException( - "Processor is readOnly but generating mutations on row:" + - Bytes.toStringBinary(row)); - } + // 4. Let the processor scan the rows, generate mutations and add + // waledits + doProcessRowWithTimeout( + processor, now, this, mutations, walEdit, timeout); if (!mutations.isEmpty()) { - // 4. Get a mvcc write number + // 5. Get a mvcc write number writeEntry = mvcc.beginMemstoreInsert(); - // 5. Apply to memstore and a WALEdit + // 6. Apply to memstore for (KeyValue kv : mutations) { kv.setMemstoreTS(writeEntry.getWriteNumber()); - walEdits.add(kv); - addedSize += stores.get(kv.getFamily()).add(kv); + byte[] family = kv.getFamily(); + checkFamily(family); + addedSize += stores.get(family).add(kv); } long txid = 0; - // 6. Append no sync - if (!walEdits.isEmpty()) { + // 7. Append no sync + if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.regionInfo, - this.htableDescriptor.getName(), walEdits, + this.htableDescriptor.getName(), walEdit, processor.getClusterId(), now, this.htableDescriptor); } - // 7. Release region lock + // 8. Release region lock if (locked) { this.updatesLock.readLock().unlock(); locked = false; } - // 8. Release row lock - if (rowLockID != null) { - releaseRowLock(rowLockID); - rowLockID = null; + // 9. Release row lock(s) + if (acquiredLocks != null) { + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } + acquiredLocks = null; } - // 9. Sync edit log - if (txid != 0) { + // 10. Sync edit log + if (txid != 0 && + (this.regionInfo.isMetaRegion() || + !this.htableDescriptor.isDeferredLogFlush())) { this.log.sync(txid); } walSyncSuccessful = true; @@ -4476,12 +4365,13 @@ public class HRegion implements HeapSize { // , Writable{ } finally { if (!mutations.isEmpty() && !walSyncSuccessful) { LOG.warn("Wal sync failed. Roll back " + mutations.size() + - " memstore keyvalues for row:" + processor.getRow()); + " memstore keyvalues for row(s):" + + processor.getRowsToLock().iterator().next() + "..."); for (KeyValue kv : mutations) { stores.get(kv.getFamily()).rollback(kv); } } - // 10. Roll mvcc forward + // 11. Roll mvcc forward if (writeEntry != null) { mvcc.completeMemstoreInsert(writeEntry); writeEntry = null; @@ -4490,11 +4380,16 @@ public class HRegion implements HeapSize { // , Writable{ this.updatesLock.readLock().unlock(); locked = false; } - if (rowLockID != null) { - releaseRowLock(rowLockID); - rowLockID = null; + if (acquiredLocks != null) { + for (Integer lid : acquiredLocks) { + releaseRowLock(lid); + } } } + + // 12. Run post-process hook + processor.postProcess(this, walEdit); + } finally { closeRegionOperation(); if (!mutations.isEmpty() && @@ -4506,48 +4401,54 @@ public class HRegion implements HeapSize { // , Writable{ private void doProcessRowWithTimeout(final RowProcessor processor, final long now, - final RowProcessor.RowScanner scanner, + final HRegion region, final List mutations, - final WALEdit walEdits, + final WALEdit walEdit, final long timeout) throws IOException { + // Short circuit the no time bound case. + if (timeout < 0) { + try { + processor.process(now, region, mutations, walEdit); + } catch (IOException e) { + LOG.warn("RowProcessor:" + processor.getClass().getName() + + " throws Exception on row(s):" + + Bytes.toStringBinary( + processor.getRowsToLock().iterator().next()) + "...", e); + throw e; + } + return; + } + + // Case with time bound FutureTask task = new FutureTask(new Callable() { @Override public Void call() throws IOException { - processor.process(now, scanner, mutations, walEdits); - return null; + try { + processor.process(now, region, mutations, walEdit); + return null; + } catch (IOException e) { + LOG.warn("RowProcessor:" + processor.getClass().getName() + + " throws Exception on row(s):" + + Bytes.toStringBinary( + processor.getRowsToLock().iterator().next()) + "...", e); + throw e; + } } }); - Thread t = new Thread(task); - t.setDaemon(true); - t.start(); + rowProcessorExecutor.execute(task); try { task.get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException te) { - LOG.error("RowProcessor timeout on row:" + - Bytes.toStringBinary(processor.getRow()) + " timeout:" + timeout, te); + LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" + + Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) + + "..."); throw new IOException(te); } catch (Exception e) { throw new IOException(e); } } - final private RowProcessor.RowScanner rowScanner = - new RowProcessor.RowScanner() { - @Override - public void doScan(Scan scan, List result) throws IOException { - InternalScanner scanner = null; - try { - scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); - scanner = HRegion.this.getScanner(scan); - result.clear(); - scanner.next(result); - } finally { - if (scanner != null) scanner.close(); - } - } - }; - // TODO: There's a lot of boiler plate code identical // to increment... See how to better unify that. /** diff --git src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java new file mode 100644 index 0000000..208de5c --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * A MultiRowProcessor that performs multiple puts and deletes. + */ +class MultiRowMutationProcessor extends BaseRowProcessor { + Collection rowsToLock; + Collection mutations; + + MultiRowMutationProcessor(Collection mutations, + Collection rowsToLock) { + this.rowsToLock = rowsToLock; + this.mutations = mutations; + } + + @Override + public Collection getRowsToLock() { + return rowsToLock; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, + HRegion region, + List mutationKvs, + WALEdit walEdit) throws IOException { + byte[] byteNow = Bytes.toBytes(now); + // Check mutations and apply edits to a single WALEdit + for (Mutation m : mutations) { + if (m instanceof Put) { + Map> familyMap = m.getFamilyMap(); + region.checkFamilies(familyMap.keySet()); + region.checkTimestamps(familyMap, now); + region.updateKVTimestamps(familyMap.values(), byteNow); + } else if (m instanceof Delete) { + Delete d = (Delete) m; + region.prepareDelete(d); + region.prepareDeleteTimestamps(d, byteNow); + } else { + throw new DoNotRetryIOException( + "Action must be Put or Delete. But was: " + + m.getClass().getName()); + } + for (List edits : m.getFamilyMap().values()) { + boolean writeToWAL = m.getWriteToWAL(); + for (KeyValue kv : edits) { + mutationKvs.add(kv); + if (writeToWAL) { + walEdit.add(kv); + } + } + } + } + } + + @Override + public void preProcess(HRegion region, WALEdit walEdit) throws IOException { + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + if (coprocessorHost != null) { + for (Mutation m : mutations) { + if (m instanceof Put) { + if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) { + // by pass everything + return; + } + } else if (m instanceof Delete) { + Delete d = (Delete) m; + region.prepareDelete(d); + if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) { + // by pass everything + return; + } + } + } + } + } + + @Override + public void postProcess(HRegion region, WALEdit walEdit) throws IOException { + RegionCoprocessorHost coprocessorHost = region.getCoprocessorHost(); + if (coprocessorHost != null) { + for (Mutation m : mutations) { + if (m instanceof Put) { + coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL()); + } else if (m instanceof Delete) { + coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL()); + } + } + } + } + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java new file mode 100644 index 0000000..15c67f2 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/RowProcessor.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.UUID; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +@InterfaceAudience.Public +@InterfaceStability.Evolving + +/** + * Defines the procedure to atomically perform multiple scans and mutations + * on a HRegion. + * + * This is invoked by {@link HRegion#processRowsWithLocks()}. + * This class performs scans and generates mutations and WAL edits. + * The locks and MVCC will be handled by HRegion. + * + * The generic type parameter T is the return type of + * RowProcessor.getResult(). + */ +public interface RowProcessor { + + /** + * Rows to lock while operation. + * They have to be sorted with RowProcessor + * to avoid deadlock. + */ + Collection getRowsToLock(); + + /** + * Obtain the processing result + */ + T getResult(); + + /** + * Is this operation read only? If this is true, process() should not add + * any mutations or it throws IOException. + * @return ture if read only operation + */ + boolean readOnly(); + + /** + * HRegion handles the locks and MVCC and invokes this method properly. + * + * You should override this to create your own RowProcessor. + * + * If you are doing read-modify-write here, you should consider using + * IsolationLevel.READ_UNCOMMITTED for scan because + * we advance MVCC after releasing the locks for optimization purpose. + * + * @param now the current system millisecond + * @param region the HRegion + * @param mutations the output mutations to apply to memstore + * @param walEdit the output WAL edits to apply to write ahead log + */ + void process(long now, + HRegion region, + List mutations, + WALEdit walEdit) throws IOException; + + /** + * The hook to be executed before process(). + * + * @param region the HRegion + * @param walEdit the output WAL edits to apply to write ahead log + */ + void preProcess(HRegion region, WALEdit walEdit) throws IOException; + + /** + * The hook to be executed after process(). + * + * @param region the HRegion + * @param walEdit the output WAL edits to apply to write ahead log + */ + void postProcess(HRegion region, WALEdit walEdit) throws IOException; + + + /** + * @return The replication cluster id. + */ + UUID getClusterId(); + +} diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java deleted file mode 100644 index a4e495e..0000000 --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestProcessRowEndpoint.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.coprocessor; - -import static org.junit.Assert.assertEquals; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -import com.sun.org.apache.commons.logging.Log; -import com.sun.org.apache.commons.logging.LogFactory; - -/** - * Verifies ProcessRowEndpoint works. - * The tested RowProcessor performs two scans and a read-modify-write. - */ -@Category(SmallTests.class) -public class TestProcessRowEndpoint { - - static final Log LOG = LogFactory.getLog(TestProcessRowEndpoint.class); - - private static final byte[] TABLE = Bytes.toBytes("testtable"); - private static final byte[] TABLE2 = Bytes.toBytes("testtable2"); - private final static byte[] ROW = Bytes.toBytes("testrow"); - private final static byte[] FAM = Bytes.toBytes("friendlist"); - - // Column names - private final static byte[] A = Bytes.toBytes("a"); - private final static byte[] B = Bytes.toBytes("b"); - private final static byte[] C = Bytes.toBytes("c"); - private final static byte[] D = Bytes.toBytes("d"); - private final static byte[] E = Bytes.toBytes("e"); - private final static byte[] F = Bytes.toBytes("f"); - private final static byte[] G = Bytes.toBytes("g"); - private final static byte[] REQUESTS = Bytes.toBytes("requests"); - - private static HBaseTestingUtility util = new HBaseTestingUtility(); - private volatile int numRequests; - - private CountDownLatch startSignal; - private CountDownLatch doneSignal; - - @BeforeClass - public static void setupBeforeClass() throws Exception { - Configuration conf = util.getConfiguration(); - conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - FriendsOfFriendsEndpoint.class.getName()); - util.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - util.shutdownMiniCluster(); - } - - @Test - public void testSingle() throws Throwable { - HTable table = prepareTestData(TABLE, util); - verifyProcessRow(table); - assertEquals(1, numRequests); - } - - private void verifyProcessRow(HTable table) throws Throwable { - - FriendsOfFriendsProtocol processor = - table.coprocessorProxy(FriendsOfFriendsProtocol.class, ROW); - Result result = processor.query(ROW, A); - - Set friendsOfFriends = new HashSet(); - for (KeyValue kv : result.raw()) { - if (Bytes.equals(kv.getQualifier(), REQUESTS)) { - numRequests = Bytes.toInt(kv.getValue()); - continue; - } - for (byte val : kv.getValue()) { - friendsOfFriends.add((char)val + ""); - } - } - Set expected = - new HashSet(Arrays.asList(new String[]{"d", "e", "f", "g"})); - assertEquals(expected, friendsOfFriends); - } - - @Test - public void testThreads() throws Exception { - HTable table = prepareTestData(TABLE2, util); - int numThreads = 1000; - startSignal = new CountDownLatch(numThreads); - doneSignal = new CountDownLatch(numThreads); - for (int i = 0; i < numThreads; ++i) { - new Thread(new QueryRunner(table)).start(); - startSignal.countDown(); - } - doneSignal.await(); - Get get = new Get(ROW); - LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); - assertEquals(numThreads, numRequests); - } - - class QueryRunner implements Runnable { - final HTable table; - QueryRunner(final HTable table) { - this.table = table; - } - @Override - public void run() { - try { - startSignal.await(); - verifyProcessRow(table); - } catch (Throwable e) { - e.printStackTrace(); - } - doneSignal.countDown(); - } - } - - static HTable prepareTestData(byte[] tableName, HBaseTestingUtility util) - throws Exception { - HTable table = util.createTable(tableName, FAM); - Put put = new Put(ROW); - put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A - put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B - put.add(FAM, C, G); // G is a friend of C - table.put(put); - return table; - } - - /** - * Coprocessor protocol that finds friends of friends of a person and - * update the number of requests. - */ - public static interface FriendsOfFriendsProtocol extends CoprocessorProtocol { - - /** - * Query a person's friends of friends - */ - Result query(byte[] row, byte[] person) throws IOException; - } - - /** - * Finds friends of friends of a person and update the number of requests. - */ - public static class FriendsOfFriendsEndpoint extends BaseEndpointCoprocessor - implements FriendsOfFriendsProtocol, RowProcessor { - byte[] row = null; - byte[] person = null; - Result result = null; - - // - // FriendsOfFriendsProtocol method - // - - @Override - public Result query(byte[] row, byte[] person) throws IOException { - this.row = row; - this.person = person; - HRegion region = - ((RegionCoprocessorEnvironment) getEnvironment()).getRegion(); - region.processRow(this); - return this.getResult(); - } - - // - // RowProcessor methods - // - - FriendsOfFriendsEndpoint() { - } - - @Override - public byte[] getRow() { - return row; - } - - @Override - public Result getResult() { - return result; - } - - @Override - public boolean readOnly() { - return false; - } - - @Override - public void process(long now, RowProcessor.RowScanner scanner, - List mutations, WALEdit walEdit) throws IOException { - List kvs = new ArrayList(); - { // First scan to get friends of the person and numRequests - Scan scan = new Scan(row, row); - scan.addColumn(FAM, person); - scan.addColumn(FAM, REQUESTS); - scanner.doScan(scan, kvs); - } - LOG.debug("first scan:" + stringifyKvs(kvs)); - int numRequests = 0; - // Second scan to get friends of friends - Scan scan = new Scan(row, row); - for (KeyValue kv : kvs) { - if (Bytes.equals(kv.getQualifier(), REQUESTS)) { - numRequests = Bytes.toInt(kv.getValue()); - continue; - } - byte[] friends = kv.getValue(); - for (byte f : friends) { - scan.addColumn(FAM, new byte[]{f}); - } - } - scanner.doScan(scan, kvs); - - LOG.debug("second scan:" + stringifyKvs(kvs)); - numRequests += 1; - // Construct mutations and Result - KeyValue kv = new KeyValue( - row, FAM, REQUESTS, now, Bytes.toBytes(numRequests)); - mutations.clear(); - mutations.add(kv); - kvs.add(kv); - LOG.debug("final result:" + stringifyKvs(kvs) + - " mutations:" + stringifyKvs(mutations)); - result = new Result(kvs); - // Inject some meta data to the walEdit - KeyValue metaKv = new KeyValue( - getRow(), HLog.METAFAMILY, - Bytes.toBytes("FriendsOfFriends query"), - person); - walEdit.add(metaKv); - } - - @Override - public void readFields(DataInput in) throws IOException { - this.person = Bytes.readByteArray(in); - this.row = Bytes.readByteArray(in); - this.result = new Result(); - result.readFields(in); - } - - @Override - public void write(DataOutput out) throws IOException { - Bytes.writeByteArray(out, person); - Bytes.writeByteArray(out, row); - if (result == null) { - new Result().write(out); - } else { - result.write(out); - } - } - - @Override - public UUID getClusterId() { - return HConstants.DEFAULT_CLUSTER_ID; - } - } - - static String stringifyKvs(Collection kvs) { - StringBuilder out = new StringBuilder(); - out.append("["); - for (KeyValue kv : kvs) { - byte[] col = kv.getQualifier(); - byte[] val = kv.getValue(); - if (Bytes.equals(col, REQUESTS)) { - out.append(Bytes.toStringBinary(col) + ":" + - Bytes.toInt(val) + " "); - } else { - out.append(Bytes.toStringBinary(col) + ":" + - Bytes.toStringBinary(val) + " "); - } - } - out.append("]"); - return out.toString(); - } - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} diff --git src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java new file mode 100644 index 0000000..96a0611 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestRowProcessorEndpoint.java @@ -0,0 +1,592 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.IsolationLevel; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.BaseRowProcessor; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.sun.org.apache.commons.logging.Log; +import com.sun.org.apache.commons.logging.LogFactory; + +/** + * Verifies ProcessRowEndpoint works. + * The tested RowProcessor performs two scans and a read-modify-write. + */ +@Category(SmallTests.class) +public class TestRowProcessorEndpoint { + + static final Log LOG = LogFactory.getLog(TestRowProcessorEndpoint.class); + + private static final byte[] TABLE = Bytes.toBytes("testtable"); + private final static byte[] ROW = Bytes.toBytes("testrow"); + private final static byte[] ROW2 = Bytes.toBytes("testrow2"); + private final static byte[] FAM = Bytes.toBytes("friendlist"); + + // Column names + private final static byte[] A = Bytes.toBytes("a"); + private final static byte[] B = Bytes.toBytes("b"); + private final static byte[] C = Bytes.toBytes("c"); + private final static byte[] D = Bytes.toBytes("d"); + private final static byte[] E = Bytes.toBytes("e"); + private final static byte[] F = Bytes.toBytes("f"); + private final static byte[] G = Bytes.toBytes("g"); + private final static byte[] COUNTER = Bytes.toBytes("counter"); + private final AtomicInteger failures = new AtomicInteger(0); + + private static HBaseTestingUtility util = new HBaseTestingUtility(); + private static volatile int expectedCounter = 0; + private static int rowSize, row2Size; + + private volatile static HTable table = null; + private volatile static boolean swapped = false; + private volatile CountDownLatch startSignal; + private volatile CountDownLatch doneSignal; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + Configuration conf = util.getConfiguration(); + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + RowProcessorEndpoint.class.getName()); + conf.setInt("hbase.client.retries.number", 1); + conf.setLong("hbase.hregion.row.processor.timeout", 1000L); + util.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + util.shutdownMiniCluster(); + } + + public void prepareTestData() throws Exception { + try { + util.getHBaseAdmin().disableTable(TABLE); + util.getHBaseAdmin().deleteTable(TABLE); + } catch (Exception e) { + // ignore table not found + } + table = util.createTable(TABLE, FAM); + { + Put put = new Put(ROW); + put.add(FAM, A, Bytes.add(B, C)); // B, C are friends of A + put.add(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B + put.add(FAM, C, G); // G is a friend of C + table.put(put); + rowSize = put.size(); + } + Put put = new Put(ROW2); + put.add(FAM, D, E); + put.add(FAM, F, G); + table.put(put); + row2Size = put.size(); + } + + @Test + public void testDoubleScan() throws Throwable { + prepareTestData(); + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.FriendsOfFriendsProcessor processor = + new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A); + Set result = protocol.process(processor); + + Set expected = + new HashSet(Arrays.asList(new String[]{"d", "e", "f", "g"})); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); + assertEquals(expected, result); + } + + @Test + public void testReadModifyWrite() throws Throwable { + prepareTestData(); + failures.set(0); + int numThreads = 1000; + concurrentExec(new IncrementRunner(), numThreads); + Get get = new Get(ROW); + LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).list())); + int finalCounter = incrementCounter(table); + assertEquals(numThreads + 1, finalCounter); + assertEquals(0, failures.get()); + } + + class IncrementRunner implements Runnable { + @Override + public void run() { + try { + incrementCounter(table); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + private int incrementCounter(HTable table) throws Throwable { + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.IncrementCounterProcessor processor = + new RowProcessorEndpoint.IncrementCounterProcessor(ROW); + int counterValue = protocol.process(processor); + return counterValue; + } + + private void concurrentExec( + final Runnable task, final int numThreads) throws Throwable { + startSignal = new CountDownLatch(numThreads); + doneSignal = new CountDownLatch(numThreads); + for (int i = 0; i < numThreads; ++i) { + new Thread(new Runnable() { + @Override + public void run() { + try { + startSignal.countDown(); + startSignal.await(); + task.run(); + } catch (Throwable e) { + failures.incrementAndGet(); + e.printStackTrace(); + } + doneSignal.countDown(); + } + }).start(); + } + doneSignal.await(); + } + + @Test + public void testMultipleRows() throws Throwable { + prepareTestData(); + failures.set(0); + int numThreads = 1000; + concurrentExec(new SwapRowsRunner(), numThreads); + LOG.debug("row keyvalues:" + + stringifyKvs(table.get(new Get(ROW)).list())); + LOG.debug("row2 keyvalues:" + + stringifyKvs(table.get(new Get(ROW2)).list())); + assertEquals(rowSize, table.get(new Get(ROW)).list().size()); + assertEquals(row2Size, table.get(new Get(ROW2)).list().size()); + assertEquals(0, failures.get()); + } + + class SwapRowsRunner implements Runnable { + @Override + public void run() { + try { + swapRows(table); + } catch (Throwable e) { + e.printStackTrace(); + } + } + } + + private void swapRows(HTable table) throws Throwable { + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.RowSwapProcessor processor = + new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2); + protocol.process(processor); + } + + @Test + public void testTimeout() throws Throwable { + prepareTestData(); + RowProcessorProtocol protocol = + table.coprocessorProxy(RowProcessorProtocol.class, ROW); + RowProcessorEndpoint.TimeoutProcessor processor = + new RowProcessorEndpoint.TimeoutProcessor(ROW); + boolean exceptionCaught = false; + try { + protocol.process(processor); + } catch (Exception e) { + exceptionCaught = true; + } + assertTrue(exceptionCaught); + } + + /** + * This class defines two RowProcessors: + * IncrementCounterProcessor and FriendsOfFriendsProcessor. + * + * We define the RowProcessors as the inner class of the endpoint. + * So they can be loaded with the endpoint on the coprocessor. + */ + public static class RowProcessorEndpoint extends BaseRowProcessorEndpoint + implements RowProcessorProtocol { + + public static class IncrementCounterProcessor extends + BaseRowProcessor implements Writable { + int counter = 0; + byte[] row = new byte[0]; + + /** + * Empty constructor for Writable + */ + IncrementCounterProcessor() { + } + + IncrementCounterProcessor(byte[] row) { + this.row = row; + } + + @Override + public Collection getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public Integer getResult() { + return counter; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + // Scan current counter + List kvs = new ArrayList(); + Scan scan = new Scan(row, row); + scan.addColumn(FAM, COUNTER); + doScan(region, scan, kvs); + counter = kvs.size() == 0 ? 0 : + Bytes.toInt(kvs.iterator().next().getValue()); + + // Assert counter value + assertEquals(expectedCounter, counter); + + // Increment counter and send it to both memstore and wal edit + counter += 1; + expectedCounter += 1; + + + KeyValue kv = + new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter)); + mutations.add(kv); + walEdit.add(kv); + + // We can also inject some meta data to the walEdit + KeyValue metaKv = new KeyValue( + row, HLog.METAFAMILY, + Bytes.toBytes("I just increment counter"), + Bytes.toBytes(counter)); + walEdit.add(metaKv); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.row = Bytes.readByteArray(in); + this.counter = in.readInt(); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, row); + out.writeInt(counter); + } + + } + + public static class FriendsOfFriendsProcessor extends + BaseRowProcessor> implements Writable { + byte[] row = null; + byte[] person = null; + final Set result = new HashSet(); + + /** + * Empty constructor for Writable + */ + FriendsOfFriendsProcessor() { + } + + FriendsOfFriendsProcessor(byte[] row, byte[] person) { + this.row = row; + this.person = person; + } + + @Override + public Collection getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public Set getResult() { + return result; + } + + @Override + public boolean readOnly() { + return true; + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + List kvs = new ArrayList(); + { // First scan to get friends of the person + Scan scan = new Scan(row, row); + scan.addColumn(FAM, person); + doScan(region, scan, kvs); + } + + // Second scan to get friends of friends + Scan scan = new Scan(row, row); + for (KeyValue kv : kvs) { + byte[] friends = kv.getValue(); + for (byte f : friends) { + scan.addColumn(FAM, new byte[]{f}); + } + } + doScan(region, scan, kvs); + + // Collect result + result.clear(); + for (KeyValue kv : kvs) { + for (byte b : kv.getValue()) { + result.add((char)b + ""); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.person = Bytes.readByteArray(in); + this.row = Bytes.readByteArray(in); + int size = in.readInt(); + result.clear(); + for (int i = 0; i < size; ++i) { + result.add(Text.readString(in)); + } + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, person); + Bytes.writeByteArray(out, row); + out.writeInt(result.size()); + for (String s : result) { + Text.writeString(out, s); + } + } + } + + public static class RowSwapProcessor extends + BaseRowProcessor> implements Writable { + byte[] row1 = new byte[0]; + byte[] row2 = new byte[0]; + + /** + * Empty constructor for Writable + */ + RowSwapProcessor() { + } + + RowSwapProcessor(byte[] row1, byte[] row2) { + this.row1 = row1; + this.row2 = row2; + } + + @Override + public Collection getRowsToLock() { + List rows = new ArrayList(); + rows.add(row1); + rows.add(row2); + return rows; + } + + @Override + public boolean readOnly() { + return false; + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + + // Scan both rows + List kvs1 = new ArrayList(); + List kvs2 = new ArrayList(); + doScan(region, new Scan(row1, row1), kvs1); + doScan(region, new Scan(row2, row2), kvs2); + + // Assert swapped + if (swapped) { + assertEquals(rowSize, kvs2.size()); + assertEquals(row2Size, kvs1.size()); + } else { + assertEquals(rowSize, kvs1.size()); + assertEquals(row2Size, kvs2.size()); + } + swapped = !swapped; + + // Add and delete keyvalues + List> kvs = new ArrayList>(); + kvs.add(kvs1); + kvs.add(kvs2); + byte[][] rows = new byte[][]{row1, row2}; + for (int i = 0; i < kvs.size(); ++i) { + for (KeyValue kv : kvs.get(i)) { + // Delete from the current row and add to the other row + KeyValue kvDelete = + new KeyValue(rows[i], kv.getFamily(), kv.getQualifier(), + kv.getTimestamp(), KeyValue.Type.Delete); + KeyValue kvAdd = + new KeyValue(rows[1 - i], kv.getFamily(), kv.getQualifier(), + now, kv.getValue()); + mutations.add(kvDelete); + walEdit.add(kvDelete); + mutations.add(kvAdd); + walEdit.add(kvAdd); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + this.row1 = Bytes.readByteArray(in); + this.row2 = Bytes.readByteArray(in); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, row1); + Bytes.writeByteArray(out, row2); + } + } + + public static class TimeoutProcessor extends + BaseRowProcessor implements Writable { + + byte[] row = new byte[0]; + + /** + * Empty constructor for Writable + */ + public TimeoutProcessor() { + } + + public TimeoutProcessor(byte[] row) { + this.row = row; + } + + public Collection getRowsToLock() { + return Collections.singleton(row); + } + + @Override + public void process(long now, HRegion region, + List mutations, WALEdit walEdit) throws IOException { + try { + // Sleep for a long time so it timeout + Thread.sleep(100 * 1000L); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public boolean readOnly() { + return true; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.row = Bytes.readByteArray(in); + } + + @Override + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, row); + } + } + + public static void doScan( + HRegion region, Scan scan, List result) throws IOException { + InternalScanner scanner = null; + try { + scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED); + scanner = region.getScanner(scan); + result.clear(); + scanner.next(result); + } finally { + if (scanner != null) scanner.close(); + } + } + } + + static String stringifyKvs(Collection kvs) { + StringBuilder out = new StringBuilder(); + out.append("["); + if (kvs != null) { + for (KeyValue kv : kvs) { + byte[] col = kv.getQualifier(); + byte[] val = kv.getValue(); + if (Bytes.equals(col, COUNTER)) { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toInt(val) + " "); + } else { + out.append(Bytes.toStringBinary(col) + ":" + + Bytes.toStringBinary(val) + " "); + } + } + } + out.append("]"); + return out.toString(); + } + + @org.junit.Rule + public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = + new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); +}