From cd651d98b5cef710f651ab8e228afe560aa789c2 Mon Sep 17 00:00:00 2001 From: Yechao Chen Date: Mon, 11 Sep 2017 10:14:27 +0800 Subject: [PATCH] HBASE-18772 [JDK8] Replace AtomicLong with LongAdder --- .../hadoop/hbase/client/BufferedMutatorImpl.java | 15 +-- .../hbase/client/ClientAsyncPrefetchScanner.java | 10 +- .../apache/hadoop/hbase/client/FailureInfo.java | 4 +- .../hadoop/hbase/client/HTableMultiplexer.java | 9 +- .../client/PreemptiveFastFailInterceptor.java | 4 +- .../hadoop/hbase/client/TestAsyncProcess.java | 8 +- .../client/coprocessor/AggregationClient.java | 8 +- .../hadoop/hbase/IntegrationTestLazyCfLoading.java | 14 +-- ...nTestTimeBoundedRequestsWithRegionReplicas.java | 14 +-- .../hbase/mapreduce/TestWALRecordReader.java | 2 - .../org/apache/hadoop/hbase/SplitLogCounters.java | 114 ++++++++++----------- .../coordination/SplitLogWorkerCoordination.java | 5 +- .../ZKSplitLogManagerCoordination.java | 56 +++++----- .../coordination/ZkSplitLogWorkerCoordination.java | 28 ++--- .../hadoop/hbase/master/SplitLogManager.java | 12 +-- .../org/apache/hadoop/hbase/mob/MobFileCache.java | 21 ++-- .../hadoop/hbase/regionserver/ChunkCreator.java | 7 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 12 +-- .../hbase/regionserver/RegionServerAccounting.java | 18 ++-- .../regionserver/handler/WALSplitterHandler.java | 2 +- .../regionserver/ReplicationSource.java | 10 +- .../regionserver/ReplicationSourceManager.java | 5 +- .../regionserver/ReplicationSourceWALReader.java | 3 +- .../java/org/apache/hadoop/hbase/tool/Canary.java | 57 ++++++----- .../hbase/replication/TestReplicationSource.java | 3 +- .../regionserver/TestGlobalThrottler.java | 3 +- .../regionserver/TestWALEntryStream.java | 4 +- .../hadoop/hbase/thrift/IncrementCoalescer.java | 22 ++-- 28 files changed, 239 insertions(+), 231 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 912c8f0..b5bbeb2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -32,7 +32,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -67,7 +68,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private final Configuration conf; private final ConcurrentLinkedQueue writeAsyncBuffer = new ConcurrentLinkedQueue<>(); - private final AtomicLong currentWriteBufferSize = new AtomicLong(0); + private final LongAdder currentWriteBufferSize = new LongAdder(); /** * Count the size of {@link BufferedMutatorImpl#writeAsyncBuffer}. * The {@link ConcurrentLinkedQueue#size()} is NOT a constant-time operation. @@ -164,19 +165,19 @@ public class BufferedMutatorImpl implements BufferedMutator { // 94-incompatible behavior, which is a timing issue because hasError, the below code // and setter of hasError are not synchronized. Perhaps it should be removed. if (ap.hasError()) { - currentWriteBufferSize.addAndGet(toAddSize); + currentWriteBufferSize.add(toAddSize); writeAsyncBuffer.addAll(ms); undealtMutationCount.addAndGet(toAddCount); backgroundFlushCommits(true); } else { - currentWriteBufferSize.addAndGet(toAddSize); + currentWriteBufferSize.add(toAddSize); writeAsyncBuffer.addAll(ms); undealtMutationCount.addAndGet(toAddCount); } // Now try and queue what needs to be queued. while (undealtMutationCount.get() != 0 - && currentWriteBufferSize.get() > writeBufferSize) { + && currentWriteBufferSize.sum() > writeBufferSize) { backgroundFlushCommits(false); } } @@ -320,7 +321,7 @@ public class BufferedMutatorImpl implements BufferedMutator { @VisibleForTesting long getCurrentWriteBufferSize() { - return currentWriteBufferSize.get(); + return currentWriteBufferSize.sum(); } @VisibleForTesting @@ -367,7 +368,7 @@ public class BufferedMutatorImpl implements BufferedMutator { throw new IllegalStateException(); } iter.remove(); - currentWriteBufferSize.addAndGet(-last.heapSize()); + currentWriteBufferSize.add(-last.heapSize()); --remainder; } }; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index e8da18f..da440c0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -54,7 +54,7 @@ import org.apache.hadoop.hbase.util.Threads; public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private long maxCacheSize; - private AtomicLong cacheSizeInBytes; + private LongAdder cacheSizeInBytes; // exception queue (from prefetch to main scan execution) private Queue exceptionsQueue; // prefetch thread to be executed asynchronously @@ -84,7 +84,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { // concurrent cache maxCacheSize = resultSize2CacheSize(maxScannerResultSize); cache = new LinkedBlockingQueue<>(); - cacheSizeInBytes = new AtomicLong(0); + cacheSizeInBytes = new LongAdder(); exceptionsQueue = new ConcurrentLinkedQueue<>(); prefetcher = new Thread(new PrefetchRunnable()); Threads.setDaemonThreadRunning(prefetcher, tableName + ".asyncPrefetcher"); @@ -137,7 +137,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { @Override protected void addEstimatedSize(long estimatedSize) { - cacheSizeInBytes.addAndGet(estimatedSize); + cacheSizeInBytes.add(estimatedSize); } private void handleException() throws IOException { @@ -155,7 +155,7 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { } private boolean prefetchCondition() { - return cacheSizeInBytes.get() < maxCacheSize / 2; + return cacheSizeInBytes.sum() < maxCacheSize / 2; } private Result pollCache() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java index b243684..9ff8261 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private class FailureInfo { // The number of consecutive failures. - final AtomicLong numConsecutiveFailures = new AtomicLong(); + final LongAdder numConsecutiveFailures = new LongAdder(); // The time when the server started to become unresponsive // Once set, this would never be updated. final long timeOfFirstFailureMilliSec; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 0210c9b..c606c86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -37,6 +37,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -428,7 +429,7 @@ public class HTableMultiplexer { private final HRegionLocation addr; private final LinkedBlockingQueue queue; private final HTableMultiplexer multiplexer; - private final AtomicLong totalFailedPutCount = new AtomicLong(0); + private final LongAdder totalFailedPutCount = new LongAdder(); private final AtomicInteger currentProcessingCount = new AtomicInteger(0); private final AtomicAverageCounter averageLatency = new AtomicAverageCounter(); private final AtomicLong maxLatency = new AtomicLong(0); @@ -465,7 +466,7 @@ public class HTableMultiplexer { } public long getTotalFailedCount() { - return totalFailedPutCount.get(); + return totalFailedPutCount.sum(); } public long getTotalBufferedCount() { @@ -543,7 +544,7 @@ public class HTableMultiplexer { } @VisibleForTesting - AtomicLong getTotalFailedPutCount() { + LongAdder getTotalFailedPutCount() { return this.totalFailedPutCount; } @@ -659,7 +660,7 @@ public class HTableMultiplexer { + addr.getHostnamePort(), e); } finally { // Update the totalFailedCount - this.totalFailedPutCount.addAndGet(failedCount); + this.totalFailedPutCount.add(failedCount); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java index 9993742..dfadb94 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java @@ -113,7 +113,7 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : " + context.getTries()); throw new PreemptiveFastFailException( - context.getFailureInfo().numConsecutiveFailures.get(), + context.getFailureInfo().numConsecutiveFailures.sum(), context.getFailureInfo().timeOfFirstFailureMilliSec, context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer(), context.getGuaranteedClientSideOnly().isTrue()); @@ -155,7 +155,7 @@ class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor { FailureInfo fInfo = computeIfAbsent(repeatedFailuresMap, serverName, () -> new FailureInfo(currentTime)); fInfo.timeOfLatestAttemptMilliSec = currentTime; - fInfo.numConsecutiveFailures.incrementAndGet(); + fInfo.numConsecutiveFailures.increment(); } public void handleThrowable(Throwable t1, ServerName serverName, diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 471ed96..6137cdd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -823,7 +823,7 @@ public class TestAsyncProcess { conn.getConfiguration().set(RequestControllerFactory.REQUEST_CONTROLLER_IMPL_CONF_KEY, SimpleRequestController.class.getName()); SimpleRequestController controller = (SimpleRequestController) ap.requestController; - controller.tasksInProgress.incrementAndGet(); + controller.tasksInProgress.increment(); final AtomicInteger ai = new AtomicInteger(controller.maxConcurrentTasksPerRegion); controller.taskCounterPerRegion.put(hri1.getRegionName(), ai); @@ -836,7 +836,7 @@ public class TestAsyncProcess { Threads.sleep(1000); Assert.assertFalse(checkPoint.get()); // TODO: this is timing-dependent ai.decrementAndGet(); - controller.tasksInProgress.decrementAndGet(); + controller.tasksInProgress.decrement(); checkPoint2.set(true); } }; @@ -946,7 +946,7 @@ public class TestAsyncProcess { ap.waitForMaximumCurrentTasks(0, null); // More time to wait if there are incorrect task count. TimeUnit.SECONDS.sleep(1); - assertEquals(0, controller.tasksInProgress.get()); + assertEquals(0, controller.tasksInProgress.sum()); for (AtomicInteger count : controller.taskCounterPerRegion.values()) { assertEquals(0, count.get()); } @@ -996,7 +996,7 @@ public class TestAsyncProcess { @Override public void run() { Threads.sleep(sleepTime); - while (controller.tasksInProgress.get() > 0) { + while (controller.tasksInProgress.sum() > 0) { ap.decTaskCounters(Arrays.asList("dummy".getBytes()), sn); } } diff --git a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java index 7760bdc..b0a2488 100644 --- a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java @@ -36,7 +36,7 @@ import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -337,15 +337,15 @@ public class AggregationClient implements Closeable { final ColumnInterpreter ci, final Scan scan) throws Throwable { final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); class RowNumCallback implements Batch.Callback { - private final AtomicLong rowCountL = new AtomicLong(0); + private final LongAdder rowCountL = new LongAdder(); public long getRowNumCount() { - return rowCountL.get(); + return rowCountL.sum(); } @Override public void update(byte[] region, byte[] row, Long result) { - rowCountL.addAndGet(result.longValue()); + rowCountL.add(result.longValue()); } } RowNumCallback rowNum = new RowNumCallback(); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java index 648e6a6..ebb58c8 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestLazyCfLoading.java @@ -22,7 +22,7 @@ import java.security.InvalidParameterException; import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -94,8 +94,8 @@ public class IntegrationTestLazyCfLoading { private static final Map columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); - private final AtomicLong expectedNumberOfKeys = new AtomicLong(0); - private final AtomicLong totalNumberOfKeys = new AtomicLong(0); + private final LongAdder expectedNumberOfKeys = new LongAdder(); + private final LongAdder totalNumberOfKeys = new LongAdder(); public DataGenerator() { super(MIN_DATA_SIZE, MAX_DATA_SIZE); @@ -105,11 +105,11 @@ public class IntegrationTestLazyCfLoading { } public long getExpectedNumberOfKeys() { - return expectedNumberOfKeys.get(); + return expectedNumberOfKeys.sum(); } public long getTotalNumberOfKeys() { - return totalNumberOfKeys.get(); + return totalNumberOfKeys.sum(); } @Override @@ -133,9 +133,9 @@ public class IntegrationTestLazyCfLoading { // Random deterministic way to make some values "on" and others "off" for filters. long value = Long.parseLong(Bytes.toString(rowKey, 0, 4), 16) & ACCEPTED_VALUE; if (Bytes.BYTES_COMPARATOR.compare(cf, ESSENTIAL_CF) == 0) { - totalNumberOfKeys.incrementAndGet(); + totalNumberOfKeys.increment(); if (value == ACCEPTED_VALUE) { - expectedNumberOfKeys.incrementAndGet(); + expectedNumberOfKeys.increment(); } } return Bytes.toBytes(value); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java index 7337423..eae21f1 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/test/IntegrationTestTimeBoundedRequestsWithRegionReplicas.java @@ -24,7 +24,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.logging.Log; @@ -244,10 +244,10 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr public static class TimeBoundedMultiThreadedReader extends MultiThreadedReader { protected long timeoutNano; - protected AtomicLong timedOutReads = new AtomicLong(); + protected LongAdder timedOutReads = new LongAdder(); protected long runTime; protected Thread timeoutThread; - protected AtomicLong staleReads = new AtomicLong(); + protected LongAdder staleReads = new LongAdder(); public TimeBoundedMultiThreadedReader(LoadTestDataGenerator dataGen, Configuration conf, TableName tableName, double verifyPercent) throws IOException { @@ -277,8 +277,8 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr @Override protected String progressInfo() { StringBuilder builder = new StringBuilder(super.progressInfo()); - appendToStatus(builder, "stale_reads", staleReads.get()); - appendToStatus(builder, "get_timeouts", timedOutReads.get()); + appendToStatus(builder, "stale_reads", staleReads.sum()); + appendToStatus(builder, "get_timeouts", timedOutReads.sum()); return builder.toString(); } @@ -343,12 +343,12 @@ public class IntegrationTestTimeBoundedRequestsWithRegionReplicas extends Integr throws IOException { super.verifyResultsAndUpdateMetrics(verify, gets, elapsedNano, results, table, isNullExpected); for (Result r : results) { - if (r.isStale()) staleReads.incrementAndGet(); + if (r.isStale()) staleReads.increment(); } // we actually do not timeout and cancel the reads after timeout. We just wait for the RPC // to complete, but if the request took longer than timeout, we treat that as error. if (elapsedNano > timeoutNano) { - timedOutReads.incrementAndGet(); + timedOutReads.increment(); numReadFailures.addAndGet(1); // fail the test for (Result r : results) { LOG.error("FAILED FOR " + r); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java index 34725b4..5ec2855 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestWALRecordReader.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.util.List; import java.util.NavigableMap; import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -188,7 +187,6 @@ public class TestWALRecordReader { final WALFactory walfactory = new WALFactory(conf, null, getName()); WAL log = walfactory.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace()); byte [] value = Bytes.toBytes("value"); - final AtomicLong sequenceId = new AtomicLong(0); WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java index bde1b88..5127ea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogCounters.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase; * limitations under the License. */ import java.lang.reflect.Field; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -28,69 +28,69 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; */ @InterfaceAudience.Private public class SplitLogCounters { - //SplitLogManager counters - public final static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_batch_success = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_new_unexpected_wals = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_start = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_success = new AtomicLong(0); - public final static AtomicLong tot_mgr_log_split_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_result = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0); - public final static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); - public final static AtomicLong tot_mgr_null_data = new AtomicLong(0); - public final static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); - public final static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); - public final static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0); - public final static AtomicLong tot_mgr_missing_state_in_delete = new AtomicLong(0); - public final static AtomicLong tot_mgr_heartbeat = new AtomicLong(0); - public final static AtomicLong tot_mgr_rescan = new AtomicLong(0); - public final static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0); - public final static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); - public final static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_dead_server_task = new AtomicLong(0); - public final static AtomicLong tot_mgr_resubmit_force = new AtomicLong(0); + //Spnager counters + public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder(); + public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder(); + public final static LongAdder tot_mgr_log_split_batch_err = new LongAdder(); + public final static LongAdder tot_mgr_new_unexpected_wals = new LongAdder(); + public final static LongAdder tot_mgr_log_split_start = new LongAdder(); + public final static LongAdder tot_mgr_log_split_success = new LongAdder(); + public final static LongAdder tot_mgr_log_split_err = new LongAdder(); + public final static LongAdder tot_mgr_node_create_queued = new LongAdder(); + public final static LongAdder tot_mgr_node_create_result = new LongAdder(); + public final static LongAdder tot_mgr_node_already_exists = new LongAdder(); + public final static LongAdder tot_mgr_node_create_err = new LongAdder(); + public final static LongAdder tot_mgr_node_create_retry = new LongAdder(); + public final static LongAdder tot_mgr_get_data_queued = new LongAdder(); + public final static LongAdder tot_mgr_get_data_result = new LongAdder(); + public final static LongAdder tot_mgr_get_data_nonode = new LongAdder(); + public final static LongAdder tot_mgr_get_data_err = new LongAdder(); + public final static LongAdder tot_mgr_get_data_retry = new LongAdder(); + public final static LongAdder tot_mgr_node_delete_queued = new LongAdder(); + public final static LongAdder tot_mgr_node_delete_result = new LongAdder(); + public final static LongAdder tot_mgr_node_delete_err = new LongAdder(); + public final static LongAdder tot_mgr_resubmit = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_failed = new LongAdder(); + public final static LongAdder tot_mgr_null_data = new LongAdder(); + public final static LongAdder tot_mgr_orphan_task_acquired = new LongAdder(); + public final static LongAdder tot_mgr_wait_for_zk_delete = new LongAdder(); + public final static LongAdder tot_mgr_unacquired_orphan_done = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_threshold_reached = new LongAdder(); + public final static LongAdder tot_mgr_missing_state_in_delete = new LongAdder(); + public final static LongAdder tot_mgr_heartbeat = new LongAdder(); + public final static LongAdder tot_mgr_rescan = new LongAdder(); + public final static LongAdder tot_mgr_rescan_deleted = new LongAdder(0; + public final static LongAdder tot_mgr_task_deleted = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_unassigned = new LongAdder(); + public final static LongAdder tot_mgr_relist_logdir = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_dead_server_task = new LongAdder(); + public final static LongAdder tot_mgr_resubmit_force = new LongAdder(); // SplitLogWorker counters - public final static AtomicLong tot_wkr_failed_to_grab_task_no_data = new AtomicLong(0); - public final static AtomicLong tot_wkr_failed_to_grab_task_exception = new AtomicLong(0); - public final static AtomicLong tot_wkr_failed_to_grab_task_owned = new AtomicLong(0); - public final static AtomicLong tot_wkr_failed_to_grab_task_lost_race = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_acquired = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_resigned = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_done = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_err = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0); - public final static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0); - public final static AtomicLong tot_wkr_get_data_result = new AtomicLong(0); - public final static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0); - public final static AtomicLong tot_wkr_preempt_task = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0); - public final static AtomicLong tot_wkr_final_transition_failed = new AtomicLong(0); - public final static AtomicLong tot_wkr_task_grabing = new AtomicLong(0); + public final static LongAdder tot_wkr_failed_to_grab_task_no_data = new LongAdder(); + public final static LongAdder tot_wkr_failed_to_grab_task_exception = new LongAdder(); + public final static LongAdder tot_wkr_failed_to_grab_task_owned = new LongAdder(); + public final static LongAdder tot_wkr_failed_to_grab_task_lost_race = new LongAdder(); + public final static LongAdder tot_wkr_task_acquired = new LongAdder(); + public final static LongAdder tot_wkr_task_resigned = new LongAdder(); + public final static LongAdder tot_wkr_task_done = new LongAdder(); + public final static LongAdder tot_wkr_task_err = new LongAdder(); + public final static LongAdder tot_wkr_task_heartbeat = new LongAdder(); + public final static LongAdder tot_wkr_task_acquired_rescan = new LongAdder(); + public final static LongAdder tot_wkr_get_data_queued = new LongAdder(); + public final static LongAdder tot_wkr_get_data_result = new LongAdder(); + public final static LongAdder tot_wkr_get_data_retry = new LongAdder(); + public final static LongAdder tot_wkr_preempt_task = new LongAdder(); + public final static LongAdder tot_wkr_task_heartbeat_failed = new LongAdder(); + public final static LongAdder tot_wkr_final_transition_failed = new LongAdder(); + public final static LongAdder tot_wkr_task_grabing = new LongAdder(); public static void resetCounters() throws Exception { Class cl = SplitLogCounters.class; for (Field fld : cl.getDeclaredFields()) { /* Guard against source instrumentation. */ - if ((!fld.isSynthetic()) && (AtomicLong.class.isAssignableFrom(fld.getType()))) { - ((AtomicLong)fld.get(null)).set(0); + if ((!fld.isSynthetic()) && (LongAdder.class.isAssignableFrom(fld.getType()))) { + ((LongAdder)fld.get(null)).reset(); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java index 5b26c49..a18a3b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coordination; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -42,7 +43,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe * {@link #isStop()} a flag indicates whether worker should finish
* {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener * for external changes in coordination (if required)
- * {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that + * {@link #endTask(SplitLogTask, LongAdder, SplitTaskDetails)} notify coordination engine that *

* Important methods for WALSplitterHandler:
* splitting task has completed. @@ -121,7 +122,7 @@ public interface SplitLogWorkerCoordination { * @param splitTaskDetails details about log split task (specific to coordination engine being * used). */ - void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); + void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails splitTaskDetails); /** * Interface for log-split tasks Used to carry implementation details in encapsulated way through diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index 1654c67..6017317 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -206,7 +206,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements if (task.unforcedResubmits.get() >= resubmitThreshold) { if (!task.resubmitThresholdReached) { task.resubmitThresholdReached = true; - SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment(); LOG.info("Skipping resubmissions of task " + path + " because threshold " + resubmitThreshold + " reached"); } @@ -215,7 +215,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements // race with heartbeat() that might be changing last_version version = task.last_version; } else { - SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_force.increment(); version = -1; } LOG.info("resubmitting task " + path); @@ -231,7 +231,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } task.setUnassigned(); rescan(Long.MAX_VALUE); - SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit.increment(); return true; } @@ -273,7 +273,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements .getZooKeeper() .getData(path, this.watcher, new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_queued.increment(); } /** @@ -354,7 +354,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } private void deleteNode(String path, Long retries) { - SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_queued.increment(); // Once a task znode is ready for delete, that is it is in the TASK_DONE // state, then no one should be writing to it anymore. That is no one // will be updating the znode version any more. @@ -370,9 +370,9 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements task = details.getTasks().remove(path); if (task == null) { if (ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); + SplitLogCounters.tot_mgr_rescan_deleted.increment(); } - SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); + SplitLogCounters.tot_mgr_missing_state_in_delete.increment(); LOG.debug("deleted task without in memory state " + path); return; } @@ -380,7 +380,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements task.status = DELETED; task.notify(); } - SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); + SplitLogCounters.tot_mgr_task_deleted.increment(); } private void deleteNodeFailure(String path) { @@ -389,7 +389,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements } private void createRescanSuccess(String path) { - SplitLogCounters.tot_mgr_rescan.incrementAndGet(); + SplitLogCounters.tot_mgr_rescan.increment(); getDataSetWatch(path, zkretries); } @@ -416,7 +416,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode()); ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_queued.increment(); return; } @@ -434,7 +434,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements private void getDataSetWatch(String path, Long retry_count) { this.watcher.getRecoverableZooKeeper().getZooKeeper() .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); - SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_queued.increment(); } @@ -446,7 +446,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements setDone(path, SUCCESS); return; } - SplitLogCounters.tot_mgr_null_data.incrementAndGet(); + SplitLogCounters.tot_mgr_null_data.increment(); LOG.fatal("logic error - got null data " + path); setDone(path, FAILURE); return; @@ -497,17 +497,17 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements Task task = details.getTasks().get(path); if (task == null) { if (!ZKSplitLog.isRescanNode(watcher, path)) { - SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); + SplitLogCounters.tot_mgr_unacquired_orphan_done.increment(); LOG.debug("unacquired orphan task is done " + path); } } else { synchronized (task) { if (task.status == IN_PROGRESS) { if (status == SUCCESS) { - SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_success.increment(); LOG.info("Done splitting " + path); } else { - SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_err.increment(); LOG.warn("Error splitting " + path); } task.status = status; @@ -536,7 +536,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements private Task findOrCreateOrphanTask(String path) { return computeIfAbsent(details.getTasks(), path, Task::new, () -> { LOG.info("creating orphan task " + path); - SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); + SplitLogCounters.tot_mgr_orphan_task_acquired.increment(); }); } @@ -547,7 +547,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements LOG.info("task " + path + " acquired by " + workerName); } task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); - SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); + SplitLogCounters.tot_mgr_heartbeat.increment(); } else { // duplicate heartbeats - heartbeats w/o zk node version // changing - are possible. The timeout thread does @@ -898,7 +898,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements LOG.debug("failed to resubmit task " + path + " version changed"); return false; } catch (KeeperException e) { - SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_failed.increment(); LOG.warn("failed to resubmit " + path, e); return false; } @@ -947,7 +947,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx, String name) { - SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_result.increment(); if (rc != 0) { if (needAbandonRetries(rc, "Create znode " + path)) { createNodeFailure(path); @@ -961,16 +961,16 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements // And all code pieces correctly handle the case of suddenly // disappearing task-znode. LOG.debug("found pre-existing znode " + path); - SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); + SplitLogCounters.tot_mgr_node_already_exists.increment(); } else { Long retry_count = (Long) ctx; LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + retry_count); if (retry_count == 0) { - SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_err.increment(); createNodeFailure(path); } else { - SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); + SplitLogCounters.tot_mgr_node_create_retry.increment(); createNode(path, retry_count - 1); } return; @@ -988,13 +988,13 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_result.increment(); if (rc != 0) { if (needAbandonRetries(rc, "GetData from znode " + path)) { return; } if (rc == KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_nonode.increment(); LOG.warn("task znode " + path + " vanished or not created yet."); // ignore since we should not end up in a case where there is in-memory task, // but no znode. The only case is between the time task is created in-memory @@ -1011,10 +1011,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path + " remaining retries=" + retry_count); if (retry_count == 0) { - SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_err.increment(); getDataSetWatchFailure(path); } else { - SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); + SplitLogCounters.tot_mgr_get_data_retry.increment(); getDataSetWatch(path, retry_count - 1); } return; @@ -1036,14 +1036,14 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx) { - SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_result.increment(); if (rc != 0) { if (needAbandonRetries(rc, "Delete znode " + path)) { details.getFailedDeletions().add(path); return; } if (rc != KeeperException.Code.NONODE.intValue()) { - SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); + SplitLogCounters.tot_mgr_node_delete_err.increment(); Long retry_count = (Long) ctx; LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path + " remaining retries=" + retry_count); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 354f581..4c42572 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -156,7 +156,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements String taskpath = currentTask; if (taskpath != null && taskpath.equals(path)) { LOG.info("retrying data watch on " + path); - SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_retry.increment(); getDataSetWatchAsync(); } else { // no point setting a watch on the task which this worker is not @@ -169,7 +169,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements public void getDataSetWatchAsync() { watcher.getRecoverableZooKeeper().getZooKeeper() .getData(currentTask, watcher, new GetDataAsyncCallback(), null); - SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_queued.increment(); } void getDataSetWatchSuccess(String path, byte[] data) { @@ -221,12 +221,12 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { - SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); return; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); return; } SplitLogTask slt; @@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements slt = SplitLogTask.parseFrom(data); } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); - SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); return; } if (!slt.isUnassigned()) { - SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); return; } @@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements attemptToOwnTask(true, watcher, server.getServerName(), path, slt.getMode(), stat.getVersion()); if (currentVersion < 0) { - SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); + SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); return; } @@ -262,7 +262,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } LOG.info("worker " + server.getServerName() + " acquired task " + path); - SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); + SplitLogCounters.tot_wkr_task_acquired.increment(); getDataSetWatchAsync(); submitTask(path, slt.getMode(), currentVersion, reportPeriod); @@ -371,11 +371,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); if (stat == null) { LOG.warn("zk.setData() returned null for path " + task); - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); return FAILED_TO_OWN_TASK; } latestZKVersion = stat.getVersion(); - SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat.increment(); return latestZKVersion; } catch (KeeperException e) { if (!isFirstTime) { @@ -392,7 +392,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements + StringUtils.stringifyException(e1)); Thread.currentThread().interrupt(); } - SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_task_heartbeat_failed.increment(); return FAILED_TO_OWN_TASK; } @@ -440,7 +440,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements return; } } - SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); + SplitLogCounters.tot_wkr_task_grabing.increment(); synchronized (taskReadyLock) { while (seq_start == taskReadySeq.get()) { taskReadyLock.wait(checkInterval); @@ -567,7 +567,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); + SplitLogCounters.tot_wkr_get_data_result.increment(); if (rc != 0) { LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); getDataSetWatchFailure(path); @@ -609,7 +609,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } catch (KeeperException e) { LOG.warn("failed to end task, " + task + " " + slt, e); } - SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); + SplitLogCounters.tot_wkr_final_transition_failed.increment(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 7e35fe8..2eca304 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -248,7 +248,7 @@ public class SplitLogManager { logDirs + " for serverName=" + serverNames); FileStatus[] logfiles = getFileList(logDirs, filter); status.setStatus("Checking directory contents..."); - SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_start.increment(); LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + " for " + serverNames); long t = EnvironmentEdgeManager.currentTime(); @@ -278,7 +278,7 @@ public class SplitLogManager { if (batch.done != batch.installed) { batch.isDead = true; - SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_err.increment(); LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed + " but only " + batch.done + " done"); String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; @@ -302,7 +302,7 @@ public class SplitLogManager { LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); } } - SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); + SplitLogCounters.tot_mgr_log_split_batch_success.increment(); } String msg = "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed @@ -474,7 +474,7 @@ public class SplitLogManager { } while (oldtask.status == FAILURE) { LOG.debug("wait for status of task " + path + " to change to DELETED"); - SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet(); + SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); try { oldtask.wait(); } catch (InterruptedException e) { @@ -694,7 +694,7 @@ public class SplitLogManager { } found_assigned_task = true; if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { - SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment(); if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { resubmitted++; } else { @@ -741,7 +741,7 @@ public class SplitLogManager { } } getSplitLogManagerCoordination().checkTasks(); - SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); + SplitLogCounters.tot_mgr_resubmit_unassigned.increment(); LOG.debug("resubmitting unassigned task(s) after timeout"); } Set failedDeletions = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java index 308e216..4309dd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobFileCache.java @@ -28,6 +28,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; @@ -76,9 +77,9 @@ public class MobFileCache { // caches access count private final AtomicLong count = new AtomicLong(0); private long lastAccess = 0; - private final AtomicLong miss = new AtomicLong(0); + private final LongAdder miss = new LongAdder(); private long lastMiss = 0; - private final AtomicLong evictedFileCount = new AtomicLong(0); + private final LongAdder evictedFileCount = new LongAdder(); private long lastEvictedFileCount = 0; // a lock to sync the evict to guarantee the eviction occurs in sequence. @@ -163,7 +164,7 @@ public class MobFileCache { for (CachedMobFile evictedFile : evictedFiles) { closeFile(evictedFile); } - evictedFileCount.addAndGet(evictedFiles.size()); + evictedFileCount.add(evictedFiles.size()); } } @@ -180,7 +181,7 @@ public class MobFileCache { CachedMobFile evictedFile = map.remove(fileName); if (evictedFile != null) { evictedFile.close(); - evictedFileCount.incrementAndGet(); + evictedFileCount.increment(); } } catch (IOException e) { LOG.error("Failed to evict the file " + fileName, e); @@ -219,7 +220,7 @@ public class MobFileCache { cached = CachedMobFile.create(fs, path, conf, cacheConf); cached.open(); map.put(fileName, cached); - miss.incrementAndGet(); + miss.increment(); } } cached.open(); @@ -294,7 +295,7 @@ public class MobFileCache { * @return The count of misses to the mob file cache. */ public long getMissCount() { - return miss.get(); + return miss.sum(); } /** @@ -302,7 +303,7 @@ public class MobFileCache { * @return The number of items evicted from the mob file cache. */ public long getEvictedFileCount() { - return evictedFileCount.get(); + return evictedFileCount.sum(); } /** @@ -310,7 +311,7 @@ public class MobFileCache { * @return The hit ratio to the mob file cache. */ public double getHitRatio() { - return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get(); + return count.get() == 0 ? 0 : ((float) (count.get() - miss.sum())) / (float) count.get(); } /** @@ -318,8 +319,8 @@ public class MobFileCache { */ public void printStatistics() { long access = count.get() - lastAccess; - long missed = miss.get() - lastMiss; - long evicted = evictedFileCount.get() - lastEvictedFileCount; + long missed = miss.sum() - lastMiss; + long evicted = evictedFileCount.sum() - lastEvictedFileCount; int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100); LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: " + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index e818426..e314178 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -220,7 +221,7 @@ public class ChunkCreator { /** Statistics thread */ private static final int statThreadPeriod = 60 * 5; private final AtomicLong chunkCount = new AtomicLong(); - private final AtomicLong reusedChunkCount = new AtomicLong(); + private final LongAdder reusedChunkCount = new LongAdder(); MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { this.maxCount = maxCount; @@ -254,7 +255,7 @@ public class ChunkCreator { Chunk chunk = reclaimedChunks.poll(); if (chunk != null) { chunk.reset(); - reusedChunkCount.incrementAndGet(); + reusedChunkCount.increment(); } else { // Make a chunk iff we have not yet created the maxCount chunks while (true) { @@ -303,7 +304,7 @@ public class ChunkCreator { private void logStats() { if (!LOG.isDebugEnabled()) return; long created = chunkCount.get(); - long reused = reusedChunkCount.get(); + long reused = reusedChunkCount.sum(); long total = created + reused; LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + ",created chunk count=" + created diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 59b2990..67c3497 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -281,12 +281,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final LongAdder blockedRequestsCount = new LongAdder(); // Compaction LongAdders - final AtomicLong compactionsFinished = new AtomicLong(0L); - final AtomicLong compactionsFailed = new AtomicLong(0L); - final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L); - final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L); - final AtomicLong compactionsQueued = new AtomicLong(0L); - final AtomicLong flushesQueued = new AtomicLong(0L); + final LongAdder compactionsFinished = new LongAdder(); + final LongAdder compactionsFailed = new LongAdder(); + final LongAdder compactionNumFilesCompacted = new LongAdder(); + final LongAdder compactionNumBytesCompacted = new LongAdder(); + final LongAdder compactionsQueued = new LongAdder(); + final LongAdder flushesQueued = new LongAdder(); private final WAL wal; private final HRegionFileSystem fs; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index a41a731..18a8e25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.MemoryType; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -38,11 +38,11 @@ import org.apache.hadoop.hbase.util.Pair; public class RegionServerAccounting { // memstore data size - private final AtomicLong globalMemstoreDataSize = new AtomicLong(0); + private final LongAdder globalMemstoreDataSize = new LongAdder(); // memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell // POJOs and entry overhead of them onto memstore. When on heap MSLAB, this will be include heap // overhead as well as the cell data size. Ya cell data is in on heap area only then. - private final AtomicLong globalMemstoreHeapSize = new AtomicLong(0); + private final LongAdder globalMemstoreHeapSize = new LongAdder(); // Store the edits size during replaying WAL. Use this to roll back the // global memstore size once a region opening failed. @@ -115,14 +115,14 @@ public class RegionServerAccounting { * @return the global Memstore data size in the RegionServer */ public long getGlobalMemstoreDataSize() { - return globalMemstoreDataSize.get(); + return globalMemstoreDataSize.sum(); } /** * @return the global memstore heap size in the RegionServer */ public long getGlobalMemstoreHeapSize() { - return this.globalMemstoreHeapSize.get(); + return this.globalMemstoreHeapSize.sum(); } /** @@ -130,13 +130,13 @@ public class RegionServerAccounting { * the global Memstore size */ public void incGlobalMemstoreSize(MemstoreSize memStoreSize) { - globalMemstoreDataSize.addAndGet(memStoreSize.getDataSize()); - globalMemstoreHeapSize.addAndGet(memStoreSize.getHeapSize()); + globalMemstoreDataSize.add(memStoreSize.getDataSize()); + globalMemstoreHeapSize.add(memStoreSize.getHeapSize()); } public void decGlobalMemstoreSize(MemstoreSize memStoreSize) { - globalMemstoreDataSize.addAndGet(-memStoreSize.getDataSize()); - globalMemstoreHeapSize.addAndGet(-memStoreSize.getHeapSize()); + globalMemstoreDataSize.add(-memStoreSize.getDataSize()); + globalMemstoreHeapSize.add(-memStoreSize.getHeapSize()); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java index 8ad150b..b204fb6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/WALSplitterHandler.java @@ -76,7 +76,7 @@ public class WALSplitterHandler extends EventHandler { SplitLogCounters.tot_wkr_task_done, splitTaskDetails); break; case PREEMPTED: - SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); + SplitLogCounters.tot_wkr_preempt_task.increment(); LOG.warn("task execution preempted " + splitTaskDetails.getWALFile()); break; case ERR: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 1a17a2e..8e15fdc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -32,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; @@ -104,7 +104,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf // id of the other cluster private UUID peerClusterId; // total number of edits we replicated - private AtomicLong totalReplicatedEdits = new AtomicLong(0); + private LongAdder totalReplicatedEdits = new LongAdder(); // The znode we currently play with protected String peerClusterZnode; // Maximum number of retries before taking bold actions @@ -126,7 +126,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf protected final ConcurrentHashMap workerThreads = new ConcurrentHashMap<>(); - private AtomicLong totalBufferUsed; + private LongAdder totalBufferUsed; public static final String WAIT_ON_ENDPOINT_SECONDS = "hbase.replication.wait.on.endpoint.seconds"; @@ -557,7 +557,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf if (throttler.isEnabled()) { throttler.addPushSize(batchSize); } - totalReplicatedEdits.addAndGet(entries.size()); - totalBufferUsed.addAndGet(-batchSize); + totalReplicatedEdits.add(entries.size()); + totalBufferUsed.add(-batchSize); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index a809832..2b7036e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -43,6 +43,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -127,7 +128,7 @@ public class ReplicationSourceManager implements ReplicationListener { private Connection connection; private long replicationWaitTime; - private AtomicLong totalBufferUsed = new AtomicLong(); + private LongAdder totalBufferUsed = new LongAdder(); /** * Creates a replication manager and sets the watch on all the other registered region servers @@ -452,7 +453,7 @@ public class ReplicationSourceManager implements ReplicationListener { } @VisibleForTesting - public AtomicLong getTotalBufferUsed() { + public LongAdder getTotalBufferUsed() { return totalBufferUsed; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 04b596c..5475893 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -28,6 +28,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -81,7 +82,7 @@ public class ReplicationSourceWALReader extends Thread { //Indicates whether this particular worker is running private boolean isReaderRunning = true; - private AtomicLong totalBufferUsed; + private LongAdder totalBufferUsed; private long totalBufferQuota; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 56517a4..20361cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -140,8 +141,8 @@ public final class Canary implements Tool { // Simple implementation of canary sink that allows to plot on // file or standard output timings or failures. public static class StdOutSink implements Sink { - private AtomicLong readFailureCount = new AtomicLong(0), - writeFailureCount = new AtomicLong(0); + private AtomicLong readFailureCount = new AtomicLong(0); + private LongAdder writeFailureCount = new LongAdder(); private Map readFailures = new ConcurrentHashMap<>(); private Map writeFailures = new ConcurrentHashMap<>(); @@ -192,18 +193,18 @@ public final class Canary implements Tool { @Override public long getWriteFailureCount() { - return writeFailureCount.get(); + return writeFailureCount.sum(); } @Override public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e) { - writeFailureCount.incrementAndGet(); + writeFailureCount.increment(); LOG.error(String.format("write to region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e); } @Override public void publishWriteFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e) { - writeFailureCount.incrementAndGet(); + writeFailureCount.increment(); LOG.error(String.format("write to region %s on regionserver %s column family %s failed", region.getRegionNameAsString(), serverName, column.getNameAsString()), e); } @@ -250,24 +251,24 @@ public final class Canary implements Tool { public static class RegionStdOutSink extends StdOutSink { - private Map perTableReadLatency = new HashMap<>(); - private AtomicLong writeLatency = new AtomicLong(); + private Map perTableReadLatency = new HashMap<>(); + private LongAdder writeLatency = new LongAdder(); - public Map getReadLatencyMap() { + public Map getReadLatencyMap() { return this.perTableReadLatency; } - public AtomicLong initializeAndGetReadLatencyForTable(String tableName) { - AtomicLong initLatency = new AtomicLong(0L); + public LongAdder initializeAndGetReadLatencyForTable(String tableName) { + LongAdder initLatency = new LongAdder(); this.perTableReadLatency.put(tableName, initLatency); return initLatency; } public void initializeWriteLatency() { - this.writeLatency.set(0L); + this.writeLatency.reset(); } - public AtomicLong getWriteLatency() { + public LongAdder getWriteLatency() { return this.writeLatency; } } @@ -323,10 +324,10 @@ public final class Canary implements Tool { private TaskType taskType; private boolean rawScanEnabled; private ServerName serverName; - private AtomicLong readWriteLatency; + private LongAdder readWriteLatency; RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink, - TaskType taskType, boolean rawScanEnabled, AtomicLong rwLatency) { + TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) { this.connection = connection; this.region = region; this.serverName = serverName; @@ -414,7 +415,7 @@ public final class Canary implements Tool { rs.next(); } stopWatch.stop(); - this.readWriteLatency.addAndGet(stopWatch.getTime()); + this.readWriteLatency.add(stopWatch.getTime()); sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); } catch (Exception e) { sink.publishReadFailure(serverName, region, column, e); @@ -466,7 +467,7 @@ public final class Canary implements Tool { long startTime = System.currentTimeMillis(); table.put(put); long time = System.currentTimeMillis() - startTime; - this.readWriteLatency.addAndGet(time); + this.readWriteLatency.add(time); sink.publishWriteTiming(serverName, region, column, time); } catch (Exception e) { sink.publishWriteFailure(serverName, region, column, e); @@ -489,10 +490,10 @@ public final class Canary implements Tool { private String serverName; private HRegionInfo region; private ExtendedSink sink; - private AtomicLong successes; + private LongAdder successes; RegionServerTask(Connection connection, String serverName, HRegionInfo region, - ExtendedSink sink, AtomicLong successes) { + ExtendedSink sink, LongAdder successes) { this.connection = connection; this.serverName = serverName; this.region = region; @@ -540,14 +541,14 @@ public final class Canary implements Tool { s.close(); stopWatch.stop(); } - successes.incrementAndGet(); + successes.increment(); sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); } catch (TableNotFoundException tnfe) { LOG.error("Table may be deleted", tnfe); // This is ignored because it doesn't imply that the regionserver is dead } catch (TableNotEnabledException tnee) { // This is considered a success since we got a response. - successes.incrementAndGet(); + successes.increment(); LOG.debug("The targeted table was disabled. Assuming success."); } catch (DoNotRetryIOException dnrioe) { sink.publishReadFailure(tableName.getNameAsString(), serverName); @@ -1049,7 +1050,7 @@ public final class Canary implements Tool { } this.initialized = true; for (String table : tables) { - AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table); + LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ, this.rawScanEnabled, readLatency)); } @@ -1068,7 +1069,7 @@ public final class Canary implements Tool { } // sniff canary table with write operation regionSink.initializeWriteLatency(); - AtomicLong writeTableLatency = regionSink.getWriteLatency(); + LongAdder writeTableLatency = regionSink.getWriteLatency(); taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); } @@ -1080,7 +1081,7 @@ public final class Canary implements Tool { LOG.error("Sniff region failed!", e); } } - Map actualReadTableLatency = regionSink.getReadLatencyMap(); + Map actualReadTableLatency = regionSink.getReadLatencyMap(); for (Map.Entry entry : configuredReadTableTimeouts.entrySet()) { String tableName = entry.getKey(); if (actualReadTableLatency.containsKey(tableName)) { @@ -1167,7 +1168,7 @@ public final class Canary implements Tool { for (HTableDescriptor table : admin.listTables()) { if (admin.isTableEnabled(table.getTableName()) && (!table.getTableName().equals(writeTableName))) { - AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString()); + LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString()); taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, readLatency)); } } @@ -1235,7 +1236,7 @@ public final class Canary implements Tool { * @throws Exception */ private static List> sniff(final Admin admin, final Sink sink, String tableName, - ExecutorService executor, TaskType taskType, boolean rawScanEnabled, AtomicLong readLatency) throws Exception { + ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s", tableName)); @@ -1254,7 +1255,7 @@ public final class Canary implements Tool { */ private static List> sniff(final Admin admin, final Sink sink, HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType, - boolean rawScanEnabled, AtomicLong rwLatency) throws Exception { + boolean rawScanEnabled, LongAdder rwLatency) throws Exception { if (LOG.isDebugEnabled()) { LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName())); @@ -1406,11 +1407,11 @@ public final class Canary implements Tool { private void monitorRegionServers(Map> rsAndRMap) { List tasks = new ArrayList<>(); - Map successMap = new HashMap<>(); + Map successMap = new HashMap<>(); Random rand = new Random(); for (Map.Entry> entry : rsAndRMap.entrySet()) { String serverName = entry.getKey(); - AtomicLong successes = new AtomicLong(0); + LongAdder successes = new LongAdder(); successMap.put(serverName, successes); if (entry.getValue().isEmpty()) { LOG.error(String.format("Regionserver not serving any regions - %s", serverName)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java index c3b7eaf..cb0c8bb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -165,7 +166,7 @@ public class TestReplicationSource { Configuration testConf = HBaseConfiguration.create(); testConf.setInt("replication.source.maxretriesmultiplier", 1); ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class); - Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong()); + Mockito.when(manager.getTotalBufferUsed()).thenReturn(new LongAdder()); source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null, replicationEndpoint, null); ExecutorService executor = Executors.newSingleThreadExecutor(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java index 2469c7c..2be38c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalThrottler.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -129,7 +130,7 @@ public class TestGlobalThrottler { Thread watcher = new Thread(()->{ Replication replication = (Replication)utility1.getMiniHBaseCluster() .getRegionServer(0).getReplicationSourceService(); - AtomicLong bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); + LongAdder bufferUsed = replication.getReplicationManager().getTotalBufferUsed(); testQuotaPass = true; while (!Thread.interrupted()) { long size = bufferUsed.get(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java index ebbdef1..ab23166 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java @@ -33,7 +33,7 @@ import java.util.NavigableMap; import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -347,7 +347,7 @@ public class TestWALEntryStream { // start up a batcher ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); - when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); + when(mockSourceManager.getTotalBufferUsed()).thenReturn(new LongAdder()); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceMetrics()).thenReturn(new MetricsSource("1")); diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java index 221786a..edd251d 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java @@ -30,7 +30,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -150,9 +150,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { } } - private final AtomicLong failedIncrements = new AtomicLong(); - private final AtomicLong successfulCoalescings = new AtomicLong(); - private final AtomicLong totalIncrements = new AtomicLong(); + private final LongAdder failedIncrements = new LongAdder(); + private final LongAdder successfulCoalescings = new LongAdder(); + private final LongAdder totalIncrements = new LongAdder(); private final ConcurrentMap countersMap = new ConcurrentHashMap<>(100000, 0.75f, 1500); private final ThreadPoolExecutor pool; @@ -176,7 +176,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { public boolean queueIncrement(TIncrement inc) throws TException { if (!canQueue()) { - failedIncrements.incrementAndGet(); + failedIncrements.increment(); return false; } return internalQueueTincrement(inc); @@ -184,7 +184,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { public boolean queueIncrements(List incs) throws TException { if (!canQueue()) { - failedIncrements.incrementAndGet(); + failedIncrements.increment(); return false; } @@ -211,7 +211,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { //Make sure that the number of threads is scaled. dynamicallySetCoreSize(countersMapSize); - totalIncrements.incrementAndGet(); + totalIncrements.increment(); FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); @@ -224,7 +224,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { value = Long.valueOf(currentAmount); } else { value += currentAmount; - successfulCoalescings.incrementAndGet(); + successfulCoalescings.increment(); } // Try to put the value, only if there was none Long oldValue = countersMap.putIfAbsent(key, value); @@ -354,15 +354,15 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { pool.setMaximumPoolSize(newMaxSize); } public long getFailedIncrements() { - return failedIncrements.get(); + return failedIncrements.sum(); } public long getSuccessfulCoalescings() { - return successfulCoalescings.get(); + return successfulCoalescings.sum(); } public long getTotalIncrements() { - return totalIncrements.get(); + return totalIncrements.sum(); } public long getCountersMapSize() { -- 2.7.2.windows.1