Index: hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java (revision 0) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/util/Counter.java (working copy) @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * High scalable counter. Thread safe. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class Counter { + private static final int MAX_CELLS_LENGTH = 1 << 20; + private static final int SUFFERABLE_SPIN_COUNT = 2; + + private static class Cell { + // Pads are added around the value to avoid cache-line contention with + // another cell's value. The cache-line size is expected to be equal to or + // less than about 128 Bytes (= 64 Bits * 16). + + @SuppressWarnings("unused") + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + @SuppressWarnings("unused") + volatile long q0, q1, q2, q3, q4, q5, q6; + + static final AtomicLongFieldUpdater valueUpdater = + AtomicLongFieldUpdater.newUpdater(Cell.class, "value"); + + Cell() {} + + Cell(long initValue) { + value = initValue; + } + + long get() { + return value; + } + + boolean addAndIsCongested(long delta) { + for(int i = 0; i < SUFFERABLE_SPIN_COUNT; i++) { + if(add(delta)) { + return false; + } + } + + while(! add(delta)) {} + + return true; + } + + boolean add(long delta) { + long current = value; + return valueUpdater.compareAndSet(this, current, current + delta); + } + } + + private static class Container { + /** The length should be a power of 2. */ + final Cell[] cells; + + /** True if a new extended container is going to replace this. */ + final AtomicBoolean demoted = new AtomicBoolean(); + + Container(Cell cell) { + this(new Cell[] { cell }); + } + + /** + * @param cells the length should be a power of 2 + */ + Container(Cell[] cells) { + this.cells = cells; + } + } + + private final AtomicReference containerRef; + + public Counter() { + this(new Cell()); + } + + public Counter(long initValue) { + this(new Cell(initValue)); + } + + private Counter(Cell initCell) { + containerRef = new AtomicReference(new Container(initCell)); + } + + private static int hash() { + return (int) Thread.currentThread().getId(); + } + + public void add(long delta) { + Container container = containerRef.get(); + Cell[] cells = container.cells; + int index = hash() & (cells.length - 1); + Cell cell = cells[index]; + + if(cell.addAndIsCongested(delta) && cells.length < MAX_CELLS_LENGTH && + container.demoted.compareAndSet(false, true)) { + + if(containerRef.get() == container) { + Cell[] newCells = new Cell[cells.length * 2]; + System.arraycopy(cells, 0, newCells, 0, cells.length); + for(int i = cells.length; i < newCells.length; i++) { + newCells[i] = new Cell(); + // Fill all of the elements with instances. Creating a cell on demand + // and putting it into the array makes a concurrent problem about + // visibility or, in other words, happens-before relation, because + // each element of the array is not volatile so that you should + // establish the relation by some piggybacking. + } + containerRef.compareAndSet(container, new Container(newCells)); + } + } + } + + public void increment() { + add(1); + } + + public void decrement() { + add(-1); + } + + public void set(long value) { + containerRef.set(new Container(new Cell(value))); + } + + public long get() { + long sum = 0; + for(Cell cell : containerRef.get().cells) { + sum += cell.get(); + } + return sum; + } + + @Override + public String toString() { + Cell[] cells = containerRef.get().cells; + + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + long sum = 0; + + for(Cell cell : cells) { + long value = cell.get(); + sum += value; + if(min > value) { min = value; } + if(max < value) { max = value; } + } + + return new StringBuilder(100) + .append("[value=").append(sum) + .append(", cells=[length=").append(cells.length) + .append(", min=").append(min) + .append(", max=").append(max) + .append("]]").toString(); + } +} Index: hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java =================================================================== --- hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java (revision 0) +++ hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestCounter.java (working copy) @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.MediumTests; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestCounter { + private static final int[] THREAD_COUNTS = {1, 10, 100}; + private static final int DATA_COUNT = 1000000; + + private interface Operation { + void execute(); + } + + @Test + public void testIncrement() throws Exception { + for(int threadCount : THREAD_COUNTS) { + final Counter counter = new Counter(); + + execute(new Operation() { + @Override + public void execute() { + counter.increment(); + } + }, threadCount); + + Assert.assertEquals(threadCount * (long)DATA_COUNT, counter.get()); + } + } + + @Test + public void testIncrementAndGet() throws Exception { + for(int threadCount: THREAD_COUNTS) { + final Counter counter = new Counter(); + + execute(new Operation() { + @Override + public void execute() { + counter.increment(); + counter.get(); + } + }, threadCount); + + Assert.assertEquals(threadCount * (long)DATA_COUNT, counter.get()); + } + } + + private static void execute(final Operation op, int threadCount) + throws InterruptedException { + + final CountDownLatch prepareLatch = new CountDownLatch(threadCount); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch endLatch = new CountDownLatch(threadCount); + + class OperationThread extends Thread { + @Override + public void run() { + try { + prepareLatch.countDown(); + startLatch.await(); + + for(int i=0; icommons-cli - com.github.stephenc.high-scale-lib - high-scale-lib - - commons-io commons-io Index: hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (working copy) @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IntWritable; @@ -109,7 +110,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; import org.cloudera.htrace.TraceInfo; import org.codehaus.jackson.map.ObjectMapper; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableMapReduceUtil.java (working copy) @@ -67,7 +67,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; -import org.cliffc.high_scale_lib.Counter; import com.google.protobuf.InvalidProtocolBufferException; @@ -312,9 +311,6 @@ HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, HConstants.HFILE_BLOCK_CACHE_SIZE_DEFAULT); job.getConfiguration().setFloat("hbase.offheapcache.percentage", 0f); job.getConfiguration().setFloat("hbase.bucketcache.size", 0f); - - // We would need even more libraries that hbase-server depends on - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), Counter.class); } /** Index: hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/metrics/ExactCounterMetric.java (working copy) @@ -27,11 +27,11 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.util.MetricsBase; import org.apache.hadoop.metrics.util.MetricsRegistry; -import org.cliffc.high_scale_lib.Counter; import com.google.common.collect.Lists; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (working copy) @@ -356,9 +356,6 @@ // add dependencies (including HBase ones) TableMapReduceUtil.addDependencyJars(job); - // This job instantiates HRegions, which requires the Counter class from the high_scale library - TableMapReduceUtil.addDependencyJars(job.getConfiguration(), - org.cliffc.high_scale_lib.Counter.class); Path stagingDir = JobUtil.getStagingDir(conf); try { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CompressionTest; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; @@ -140,7 +141,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -5365,9 +5365,9 @@ // woefully out of date - currently missing: // 1 x HashMap - coprocessorServiceHandlers - // 6 org.cliffc.high_scale_lib.Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL, + // 6 x Counter - numMutationsWithoutWAL, dataInMemoryWithoutWAL, // checkAndMutateChecksPassed, checkAndMutateChecksFailed, readRequestsCount, - // writeRequestsCount, updatesBlockedMs + // writeRequestsCount // 1 x HRegion$WriteState - writestate // 1 x RegionCoprocessorHost - coprocessorHost // 1 x RegionSplitPolicy - splitPolicy Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy) @@ -209,6 +209,7 @@ import org.apache.hadoop.hbase.trace.SpanReceiverHost; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CompressionTest; +import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; @@ -234,7 +235,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; -import org.cliffc.high_scale_lib.Counter; import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.ByteString; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (revision 1577759) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (working copy) @@ -47,11 +47,11 @@ import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; -import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Preconditions; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; +import org.apache.hadoop.hbase.util.Counter; /** * Thread that flushes cache on request Index: pom.xml =================================================================== --- pom.xml (revision 1577759) +++ pom.xml (working copy) @@ -1115,11 +1115,6 @@ ${commons-cli.version} - com.github.stephenc.high-scale-lib - high-scale-lib - 1.1.1 - - commons-codec commons-codec ${commons-codec.version} Index: src/main/docbkx/developer.xml =================================================================== Cannot display: file marked as a binary type. svn:mime-type = application/xml