Index: src/test/java/org/apache/hadoop/hbase/coprocessor/ThreadInterleavingLongColumnInterpreter.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/ThreadInterleavingLongColumnInterpreter.java (revision 0) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/ThreadInterleavingLongColumnInterpreter.java (revision 0) @@ -0,0 +1,106 @@ +/* + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter; + +import com.google.common.base.Throwables; + +/** + * This class has all the functionality of the LongColumnInterpreter, but + * when constructed with shouldInterleave == true it will try to interleave threads + * together and cause havoc (while still keeping to its contract). + * + * The default constructor is present because we just want this gremlin active on + * the test client, not the test server, yet I could see no way to give + * the server a normal LongColumnInterpreter. The test server will use the + * default constructor, which will set the class up with no interleaving. + * + */ +public class ThreadInterleavingLongColumnInterpreter extends LongColumnInterpreter { + + private boolean shouldInterleave = false; + private volatile long lastUpdateTimeMS; + private static final long SOFT_WINDOW_MS = 500; + + public ThreadInterleavingLongColumnInterpreter(boolean shouldInterleave) { + this.shouldInterleave = shouldInterleave; + } + + public ThreadInterleavingLongColumnInterpreter() { + shouldInterleave = false; + } + + private void waitIfNecessary() { + if (shouldInterleave) { + + // This is essentially a "soft latch". We wait until there have been + // no more new waiters recently, then all waiters can proceed. We don't + // use a hard latch since that would require specific knowledge of how + // many threads we need to wait for. + lastUpdateTimeMS = System.currentTimeMillis(); + while (System.currentTimeMillis() - lastUpdateTimeMS < SOFT_WINDOW_MS) { + try { + Thread.sleep(10 + (SOFT_WINDOW_MS - (System.currentTimeMillis() - lastUpdateTimeMS))); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw Throwables.propagate(e); + } + } + } + } + + @Override + public int compare(final Long l1, final Long l2) { + int result = super.compare(l1, l2); + waitIfNecessary(); + return result; + } + + @Override + public Long add(Long l1, Long l2) { + Long result = super.add(l1, l2); + waitIfNecessary(); + return result; + } + + @Override + public Long increment(Long o) { + Long result = super.increment(o); + waitIfNecessary(); + return result; + } + + @Override + public Long multiply(Long l1, Long l2) { + Long result = super.multiply(l1, l2); + waitIfNecessary(); + return result; + } + + @Override + public double divideForAvg(Long l1, Long l2) { + double result = super.divideForAvg(l1, l2); + waitIfNecessary(); + return result; + } + +} Index: src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java =================================================================== --- src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (revision 1099910) +++ src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateProtocol.java (working copy) @@ -318,6 +318,17 @@ } @Test + public void testMaxWithForcedRaceCondition() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + final ColumnInterpreter ci = new ThreadInterleavingLongColumnInterpreter(true); + + long maximum = aClient.max(TEST_TABLE, ci, scan); + assertEquals(190, maximum); + } + + @Test public void testMaxWithInvalidRange() { AggregationClient aClient = new AggregationClient(conf); final ColumnInterpreter ci = new LongColumnInterpreter(); @@ -425,6 +436,19 @@ } @Test + public void testMinWithForcedRaceCondition() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addColumn(TEST_FAMILY, TEST_QUALIFIER); + scan.setStartRow(ROWS[5]); + scan.setStopRow(ROWS[15]); + final ColumnInterpreter ci = new ThreadInterleavingLongColumnInterpreter(true); + long min = aClient.min(TEST_TABLE, ci, + scan); + assertEquals(5, min); + } + + @Test public void testMinWithValidRangeWithNullCF() { AggregationClient aClient = new AggregationClient(conf); Scan scan = new Scan(); @@ -539,6 +563,18 @@ long sum = aClient.sum(TEST_TABLE, ci, scan); assertEquals(6 + 60, sum); } + + @Test + public void testSumWithForcedRaceCondition() throws Throwable { + AggregationClient aClient = new AggregationClient(conf); + Scan scan = new Scan(); + scan.addFamily(TEST_FAMILY); + scan.setStartRow(ROWS[6]); + scan.setStopRow(ROWS[7]); + final ColumnInterpreter ci = new ThreadInterleavingLongColumnInterpreter(true); + long sum = aClient.sum(TEST_TABLE, ci, scan); + assertEquals(6 + 60, sum); + } @Test public void testSumWithValidRangeWithNullCF() { Index: src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (revision 1099910) +++ src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java (working copy) @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -92,7 +93,7 @@ } @Override - public void update(byte[] region, byte[] row, R result) { + public synchronized void update(byte[] region, byte[] row, R result) { max = ci.compare(max, result) < 0 ? result : max; } } @@ -141,7 +142,7 @@ } @Override - public void update(byte[] region, byte[] row, R result) { + public synchronized void update(byte[] region, byte[] row, R result) { min = (min == null || ci.compare(result, min) < 0) ? result : min; } } @@ -176,15 +177,15 @@ final ColumnInterpreter ci, final Scan scan) throws Throwable { validateParameters(scan); class RowNumCallback implements Batch.Callback { - private long rowCountL = 0l; + private final AtomicLong rowCountL = new AtomicLong(0); public long getRowNumCount() { - return rowCountL; + return rowCountL.get(); } @Override public void update(byte[] region, byte[] row, Long result) { - rowCountL += result.longValue(); + rowCountL.addAndGet(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); @@ -219,7 +220,7 @@ } @Override - public void update(byte[] region, byte[] row, S result) { + public synchronized void update(byte[] region, byte[] row, S result) { sumVal = ci.add(sumVal, result); } } @@ -255,7 +256,7 @@ } @Override - public void update(byte[] region, byte[] row, Pair result) { + public synchronized void update(byte[] region, byte[] row, Pair result) { sum = ci.add(sum, result.getFirst()); rowCount += result.getSecond(); } @@ -317,7 +318,7 @@ } @Override - public void update(byte[] region, byte[] row, Pair, Long> result) { + public synchronized void update(byte[] region, byte[] row, Pair, Long> result) { sumVal = ci.add(sumVal, result.getFirst().get(0)); sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); rowCountVal += result.getSecond();