From f021063cda0d3872cbd04703a5ca7e79afeb6cfc Mon Sep 17 00:00:00 2001 From: Mohit Goel Date: Thu, 5 Jul 2018 20:16:15 -0700 Subject: [PATCH] HBASE-20874 Sending compaction descriptions from all regionservers to master. Added getCompactions() to Admin API and 'compactions' shell command. Sending list of Compactions with heartbeat. Introduced CompactionService in Master. --- .../java/org/apache/hadoop/hbase/client/Admin.java | 18 ++ .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 17 ++ .../hadoop/hbase/client/AsyncHBaseAdmin.java | 12 +- .../org/apache/hadoop/hbase/client/Compaction.java | 244 +++++++++++++++++++++ .../hadoop/hbase/client/CompactionState.java | 3 + .../hbase/client/ConnectionImplementation.java | 7 + .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 32 +++ .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 25 +++ .../hbase/client/ShortCircuitMasterConnection.java | 9 + .../hbase/shaded/protobuf/RequestConverter.java | 6 + .../src/main/protobuf/ClusterStatus.proto | 42 ++++ .../src/main/protobuf/Master.proto | 17 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 12 + .../hadoop/hbase/master/MasterRpcServices.java | 41 +++- .../apache/hadoop/hbase/master/MasterServices.java | 6 + .../hbase/master/compaction/CompactionService.java | 39 ++++ .../master/compaction/CompactionServiceImpl.java | 75 +++++++ .../hadoop/hbase/regionserver/CompactSplit.java | 101 ++++++++- .../hadoop/hbase/regionserver/HRegionServer.java | 4 + .../RunningTasksThreadPoolExecutor.java | 64 ++++++ .../hbase/master/MockNoopMasterServices.java | 6 + .../hadoop/hbase/master/TestMasterMetrics.java | 10 +- .../hadoop/hbase/regionserver/TestCompaction.java | 81 ++++++- hbase-shell/src/main/ruby/hbase/admin.rb | 13 ++ hbase-shell/src/main/ruby/shell.rb | 1 + .../src/main/ruby/shell/commands/compactions.rb | 55 +++++ 26 files changed, 921 insertions(+), 19 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/Compaction.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionService.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionServiceImpl.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RunningTasksThreadPoolExecutor.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/compactions.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index a43a0b2dc0faec561804e97d9ec1707885423a87..51fff238271f22504eb743cc6d91819df1fc21fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -1866,6 +1866,24 @@ public interface Admin extends Abortable, Closeable { */ long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException; + /** + * Get compaction status of the specified regionservers. On a large cluster with lots of + * compactions running, the returned data set may be large. + * + * @param serverNames list of all region servers. + * @return the list of compactions with servername as key. + * @throws IOException if a remote or network exception occurs + */ + Map> getCompactions(List serverNames) throws IOException; + + /** + * Get compaction status of all the regionservers. On a large cluster with lots of compactions + * running, the returned data set may be large. + * + * @return the list of compactions with servername as key. + */ + Map> getCompactions() throws IOException; + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 739c78a46ddce01c39a2c291fab74d8afbc79807..19f51029454abdcb0df7f7314d4cb9cbd5fcb889 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1089,6 +1089,23 @@ public interface AsyncAdmin { */ CompletableFuture> getLastMajorCompactionTimestampForRegion(byte[] regionName); + /** + * Get compaction status of the regionservers. On a large cluster with lots of compactions + * running,the returned data set may be large. + * + * @param serverNames list of all region servers. + * @return the list of compactions with servername as key. + */ + CompletableFuture>> getCompactions(List serverNames); + + /** + * Get compaction status of all the regionservers. On a large cluster with lots of compactions + * running,the returned data set may be large. + * + * @return the list of compactions with servername as key. + */ + CompletableFuture>> getCompactions(); + /** * @return the list of supported security capabilities. The return value will be wrapped by a * {@link CompletableFuture}. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 39eda07df32f6f93234094905efd31024f1acb3f..13f5fa79c37553137c9a3e2308756ede18b8cdfb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -27,7 +27,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.regex.Pattern; - import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -667,6 +666,17 @@ class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.getLastMajorCompactionTimestampForRegion(regionName)); } + @Override + public CompletableFuture>> getCompactions(List + serverNames) { + return wrap(rawAdmin.getCompactions(serverNames)); + } + + @Override + public CompletableFuture>> getCompactions() { + return wrap(rawAdmin.getCompactions()); + } + @Override public CompletableFuture balancerSwitch(boolean on) { return wrap(rawAdmin.balancerSwitch(on)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Compaction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Compaction.java new file mode 100644 index 0000000000000000000000000000000000000000..56e6096f60d7cfef62632a98a727b00a9a2c4adf --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Compaction.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.CompactionExecutionStateProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.CompactionProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.CompactionTypeProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CompactionResponseProto; + +/** + * This class holds all information regarding a compaction running or queued on RegionServer.Its + * an immutable class. Use CompactionBuilder to construct its objects. + */ +@InterfaceAudience.Private +public class Compaction { + + private static final Logger LOG = LoggerFactory.getLogger(Compaction.class.getName()); + public enum CompactionExecutionState {RUNNING, QUEUED} + private final CompactionExecutionState compactionExecutionState; + private final RegionInfo regionInfo; + private final String storeName; + /** The total compacting key values in currently running compaction */ + private final long totalCompactingCells; + /** The completed count of key values in currently running compaction. Determines how many KV's + * have been compacted so far during the compaction. */ + private final long currentCompactedCells; + /** The completed size of data processed by the currently running compaction, in bytes. + * Determines how many bytes have been compacted so far during the compaction. */ + private final long totalCompactedSize; + private final CompactionState compactionState; + /** Internal thread pools handling the compactions, either long,short */ + private final String poolType; + private final List fileToCompact; + + Compaction(CompactionExecutionState compactionExecutionState, RegionInfo regionInfo, String + storeName, long totalCompactingCells, long currentCompactedCells, + long totalCompactedSize, CompactionState compactionState, String poolType, List + fileToCompact) { + this.compactionExecutionState = compactionExecutionState; + this.regionInfo = regionInfo; + this.storeName = storeName; + this.totalCompactingCells = totalCompactingCells; + this.currentCompactedCells = currentCompactedCells; + this.totalCompactedSize = totalCompactedSize; + this.compactionState = compactionState; + this.poolType = poolType; + this.fileToCompact = fileToCompact; + } + + public CompactionExecutionState getCompactionExecutionState() { + return compactionExecutionState; + } + + public RegionInfo getRegionInfo() { + return regionInfo; + } + + public String getStoreName() { + return storeName; + } + + public long getTotalCompactingCells() { + return totalCompactingCells; + } + + public long getCurrentCompactedCells() { + return currentCompactedCells; + } + + public long getTotalCompactedSize() { + return totalCompactedSize; + } + + public CompactionState getCompactionState() { + return compactionState; + } + + public String getPoolType() { + return poolType; + } + + public List getFileToCompact() { + return fileToCompact; + } + + public static List fromProtos(List + compactionProtoList) { + List compactionList = new ArrayList<>(compactionProtoList.size()); + for (CompactionProto compactionProto : compactionProtoList) { + CompactionBuilder compactionBuilder = new CompactionBuilder(); + compactionBuilder.setStoreName(compactionProto.getStoreName()). + setRegionInfo(ProtobufUtil.toRegionInfo(compactionProto.getRegionInfo())). + setFileToCompact(compactionProto.getFileToCompactList()). + setCompactionState(compactionProto.getCompactionType() == + CompactionTypeProto.MAJOR ? CompactionState.MAJOR : + CompactionState.MINOR). + setTotalCompactingCells(compactionProto.getTotalCompactingCells()). + setCurrentCompactedCells(compactionProto.getCurrentCompactedCells()). + setTotalCompactedSize(compactionProto.getTotalCompactedSize()). + setCompactionExecutionState(compactionProto + .getCompactionState() == CompactionExecutionStateProto.QUEUED ? + CompactionExecutionState.QUEUED : CompactionExecutionState.RUNNING). + setPoolType(compactionProto.getPool()); + compactionList.add(compactionBuilder.createCompaction()); + } + return compactionList; + } + + public static List toProtos(List + compactionSummaries) { + List compactionProtoList = new ArrayList<>(compactionSummaries.size()); + for (Compaction summary : compactionSummaries) { + CompactionProto.Builder compactionProto = CompactionProto.newBuilder(); + compactionProto + .setCompactionState(summary.getCompactionExecutionState() == + CompactionExecutionState.QUEUED ? CompactionExecutionStateProto.QUEUED : + CompactionExecutionStateProto.RUNNING) + .setRegionInfo(ProtobufUtil.toRegionInfo(summary.getRegionInfo())) + .addAllFileToCompact(summary.getFileToCompact()) + .setStoreName(summary.getStoreName()) + .setTotalCompactingCells(summary.getTotalCompactingCells()) + .setCurrentCompactedCells(summary.getCurrentCompactedCells()) + .setTotalCompactedSize(summary.getTotalCompactedSize()) + .setCompactionType(summary.getCompactionState() == CompactionState + .MAJOR ? CompactionTypeProto.MAJOR : CompactionTypeProto.MINOR) + .setPool(summary.getPoolType()); + compactionProtoList.add(compactionProto.build()); + } + return compactionProtoList; + } + + public static Map> toServerCompactionsMap( + List compactionResponseProtos) { + Map> serverCompactions = new HashMap<>(compactionResponseProtos. + size()); + for (CompactionResponseProto compactionResponseProto : compactionResponseProtos) { + serverCompactions.put(ProtobufUtil.toServerName(compactionResponseProto.getServerName()), + Compaction.fromProtos(compactionResponseProto.getCompactionList())); + } + return serverCompactions; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + return sb.append("compactionExecutionState=").append(compactionExecutionState) + .append(", storeName='").append(storeName) + .append(", totalCompactingCells=").append(totalCompactingCells) + .append(", currentCompactedCells=").append(currentCompactedCells) + .append(", totalCompactedSize=").append(totalCompactedSize) + .append(", compactionState=").append(compactionState) + .append(", poolType='").append(poolType).append('}') + .append(", fileToCompact='").append(fileToCompact).toString(); + } + + /** + * Builder class for Compaction + */ + public static class CompactionBuilder { + private CompactionExecutionState compactionExecutionState; + private RegionInfo regionInfo; + private String storeName; + private long totalCompactingCells; + private long currentCompactedCells; + private long totalCompactedSize; + private CompactionState compactionState; + private String poolType; + private List fileToCompact = Collections.EMPTY_LIST; + + public CompactionBuilder setCompactionExecutionState(CompactionExecutionState + compactionExecutionState) { + this.compactionExecutionState = compactionExecutionState; + return this; + } + + public CompactionBuilder setRegionInfo(RegionInfo regionInfo) { + this.regionInfo = regionInfo; + return this; + } + + public CompactionBuilder setStoreName(String storeName) { + this.storeName = storeName; + return this; + } + + public CompactionBuilder setTotalCompactingCells(long totalCompactingCells) { + this.totalCompactingCells = totalCompactingCells; + return this; + } + + public CompactionBuilder setCurrentCompactedCells(long currentCompactedCells) { + this.currentCompactedCells = currentCompactedCells; + return this; + } + + public CompactionBuilder setTotalCompactedSize(long totalCompactedSize) { + this.totalCompactedSize = totalCompactedSize; + return this; + } + + public CompactionBuilder setCompactionState(CompactionState compactionState) { + this.compactionState = compactionState; + return this; + } + + public CompactionBuilder setPoolType(String poolType) { + this.poolType = poolType; + return this; + } + + public CompactionBuilder setFileToCompact(List fileToCompact) { + this.fileToCompact = fileToCompact; + return this; + } + + public Compaction createCompaction() { + return new Compaction(compactionExecutionState, regionInfo, storeName, + totalCompactingCells, currentCompactedCells, totalCompactedSize, compactionState, + poolType, fileToCompact); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompactionState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompactionState.java index 51f7d071e4acc90cb4e242135e73acef692e3ce1..b48c468ea67c75a4cf15042858df6f5c12307d53 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompactionState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CompactionState.java @@ -22,6 +22,9 @@ import org.apache.yetus.audience.InterfaceAudience; /** * POJO representing the compaction state */ +/* +TODO : This should be called CompactionType not CompactionState. + */ @InterfaceAudience.Public public enum CompactionState { NONE, MINOR, MAJOR, MAJOR_AND_MINOR diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 7e07dafa7e70b5767cffaf8b9cd4b8ae56d58e32..b8b37e3b04555e456d61adb94261f6b9037ea6ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1620,6 +1620,13 @@ class ConnectionImplementation implements ClusterConnection, Closeable { return stub.getLastMajorCompactionTimestampForRegion(controller, request); } + @Override + public MasterProtos.GetCompactionResponse getCompactions( + RpcController controller, MasterProtos.GetCompactionRequest request) + throws ServiceException { + return stub.getCompactions(controller, request); + } + @Override public IsBalancerEnabledResponse isBalancerEnabled(RpcController controller, IsBalancerEnabledRequest request) throws ServiceException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 856b11848bd1c6b8f392b4afc71373467c7d2b49..aa9fb4e0e3099b8890b5138a7ae808d138a5cade 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -137,6 +138,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnR import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AddColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.AssignRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CompactionResponseProto; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; @@ -3268,6 +3270,36 @@ public class HBaseAdmin implements Admin { }); } + @Override + public Map> getCompactions(List serverNames) throws + IOException { + return executeCallable(new MasterCallable>>(getConnection(), + getRpcControllerFactory()) { + @Override + protected Map> rpcCall() throws Exception { + List compactionResponseProtos = master.getCompactions( + getRpcController(), RequestConverter.buildGetCompactionRequest(serverNames)) + .getCompactionResponseList(); + return Compaction.toServerCompactionsMap(compactionResponseProtos); + } + }); + } + + @Override + public Map> getCompactions() throws + IOException { + return executeCallable(new MasterCallable>>(getConnection(), + getRpcControllerFactory()) { + @Override + protected Map> rpcCall() throws Exception { + List compactionResponseProtos = master.getCompactions( + getRpcController(), RequestConverter.buildGetCompactionRequest(Collections.EMPTY_LIST)) + .getCompactionResponseList(); + return Compaction.toServerCompactionsMap(compactionResponseProtos); + } + }); + } + /** * {@inheritDoc} */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 4f73909c31d614d29130c91f9de27f85fbae8580..f38039a8f6ea98beb7e2e736b886df10a002917c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -159,6 +159,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; @@ -3081,6 +3083,29 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { s.compactionSwitch(c, req, done), resp -> resp.getPrevState())).call(); } + @Override + public CompletableFuture>> getCompactions(List + serverNames) { + return this.>>newMasterCaller() + .action((controller, stub) -> this.>>call(controller, stub, + RequestConverter.buildGetCompactionRequest(serverNames) , (s, c, req, + done) -> s.getCompactions(c, req, done), + resp -> Compaction.toServerCompactionsMap(resp.getCompactionResponseList()))) + .call(); + } + + @Override + public CompletableFuture>> getCompactions() { + return this.>>newMasterCaller() + .action((controller, stub) -> this.>>call(controller, stub, + RequestConverter.buildGetCompactionRequest(Collections.EMPTY_LIST), (s, c, req, + done) -> s.getCompactions(c, req, done), + resp -> Compaction.toServerCompactionsMap(resp.getCompactionResponseList()))) + .call(); + } + @Override public CompletableFuture balancerSwitch(final boolean on) { return this diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 7bb65d2024244d9483b41c6ffae3efbcddd006a3..ec1ab8d708f8453b3731859d3ced784bbff16ad0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; @@ -444,6 +446,13 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { return stub.getLastMajorCompactionTimestamp(controller, request); } + @Override + public GetCompactionResponse getCompactions(RpcController controller, + GetCompactionRequest request) + throws ServiceException { + return stub.getCompactions(controller, request); + } + @Override public GetCompletedSnapshotsResponse getCompletedSnapshots(RpcController controller, GetCompletedSnapshotsRequest request) throws ServiceException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index db07bab46f17c2eed05a663aec4762dafd01aa25..61eb86c6d92250095ff3eea7b37220d40300afe8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -109,6 +109,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.DisableTab import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.EnableTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; @@ -1862,6 +1863,11 @@ public final class RequestConverter { return builder.setServerName(ProtobufUtil.toServerName(server)).build(); } + public static GetCompactionRequest buildGetCompactionRequest(List servers) { + GetCompactionRequest.Builder builder = GetCompactionRequest.newBuilder(); + return builder.addAllServerName(toProtoServerNames(servers)).build(); + } + private static List toProtoServerNames(List servers) { List pbServers = new ArrayList<>(servers.size()); for (ServerName server : servers) { diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto index 399ff5ea966693a6721280703c5dddd8e437dcf5..05cce08c2eba47d66dd3e87b2f3702bc65907bf5 100644 --- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto +++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto @@ -143,6 +143,44 @@ message RegionLoad { /** the current total coprocessor requests made to region */ optional uint64 cp_requests_count = 20; } +//This is confusing, as corresponding POJO CompactionState is mis-named +enum CompactionTypeProto { + MAJOR=0; + MINOR=1; +} + +enum CompactionExecutionStateProto { + RUNNING=0; + QUEUED=1; +} + +message CompactionProto { + + optional RegionInfo region_info = 1; + + optional string store_name = 2; + + /** The total compacting key values in currently running compaction */ + optional uint64 total_compacting_cells = 4; + + /** The completed count of key values in currently running compaction. Determines how many KV's + * have been compacted so far during the compaction. */ + optional uint64 current_compacted_cells = 5; + + /** The completed size of data processed by the currently running compaction, in bytes. + * Determines how many bytes have been compacted so far during the compaction. */ + optional uint64 total_compacted_size = 6; + + optional CompactionExecutionStateProto compaction_state = 7; + + optional CompactionTypeProto compaction_type = 8; + + /** Internal thread pools handling the compactions, either long,short */ + optional string pool = 9; + + /** Could be empty , as CompactionContext hasn't been created still */ + repeated string fileToCompact = 10; +} /* Server-level protobufs */ @@ -210,6 +248,10 @@ message ServerLoad { * The replicationLoadSink for the replication Sink status of this region server. */ optional ReplicationLoadSink replLoadSink = 11; + + /** Information regarding compactions on regionservers. */ + repeated CompactionProto compaction = 12; + } message LiveServerInfo { diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index c2ab18017f4bc2319180d3490792a21e2808d61d..db687e2e656436abb4478cb0c99dcf7e79fc2935 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -587,6 +587,19 @@ message MajorCompactionTimestampResponse { required int64 compaction_timestamp = 1; } +message CompactionResponseProto { + required ServerName server_name = 1; + repeated CompactionProto compaction = 2; +} + +message GetCompactionRequest { + repeated ServerName server_name = 1; +} + +message GetCompactionResponse { + repeated CompactionResponseProto compactionResponse = 1; +} + message SecurityCapabilitiesRequest { } @@ -916,6 +929,10 @@ service MasterService { rpc getLastMajorCompactionTimestampForRegion(MajorCompactionTimestampForRegionRequest) returns(MajorCompactionTimestampResponse); + /** Returns a list of Compactions status on the cluster */ + rpc GetCompactions(GetCompactionRequest) + returns(GetCompactionResponse); + rpc getProcedureResult(GetProcedureResultRequest) returns(GetProcedureResultResponse); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b7148d5884220a8231bb13fa2022a62778e09580..ddf85c1e2aba7959a0518bd41a46b89ebea08ec2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -112,6 +112,8 @@ import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner; +import org.apache.hadoop.hbase.master.compaction.CompactionService; +import org.apache.hadoop.hbase.master.compaction.CompactionServiceImpl; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; @@ -323,6 +325,8 @@ public class HMaster extends HRegionServer implements MasterServices { private ClusterSchemaService clusterSchemaService; + private CompactionService compactionService; + public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = "hbase.master.wait.on.service.seconds"; public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60; @@ -996,6 +1000,8 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Starting cluster schema service"); initClusterSchemaService(); + compactionService = new CompactionServiceImpl(this); + if (this.cpHost != null) { try { this.cpHost.preMasterInitialization(); @@ -3762,4 +3768,10 @@ public class HMaster extends HRegionServer implements MasterServices { public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() { return this.syncReplicationReplayWALManager; } + + @Override + public CompactionService getCompactionService() { + return compactionService; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index a4d9ff84f0a151ffb204f7f8a90b636c8c17d67c..659d636cd729e96e122c9369b0806a3d0e20fd97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceReq import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CompactionResponseProto; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; @@ -159,6 +160,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProced import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompactionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; @@ -470,12 +473,13 @@ public class MasterRpcServices extends RSRpcServices ServerMetrics newLoad = ServerMetricsBuilder.toServerMetrics(serverName, versionNumber, version, sl); master.getServerManager().regionServerReport(serverName, newLoad); - master.getAssignmentManager().reportOnlineRegions(serverName, - newLoad.getRegionMetrics().keySet()); - if (sl != null && master.metricsMaster != null) { + master.getAssignmentManager() + .reportOnlineRegions(serverName, newLoad.getRegionMetrics().keySet()); + if (sl != null && master.metricsMaster != null && master.getCompactionService()!= null) { // Up our metrics. - master.metricsMaster.incrementRequests( - sl.getTotalNumberOfRequests() - (oldLoad != null ? oldLoad.getRequestCount() : 0)); + master.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests() + - (oldLoad != null ? oldLoad.getRequestCount() : 0)); + master.getCompactionService().setCompactionProtoList(serverName,sl.getCompactionList()); } } catch (IOException ioe) { throw new ServiceException(ioe); @@ -1611,6 +1615,33 @@ public class MasterRpcServices extends RSRpcServices return response.build(); } + @Override + public GetCompactionResponse getCompactions(RpcController controller, + GetCompactionRequest request) + throws ServiceException { + GetCompactionResponse.Builder response = GetCompactionResponse.newBuilder(); + try { + master.checkInitialized(); + List serverNamesProtos = request.getServerNameList(); + + if (serverNamesProtos.isEmpty()) { + serverNamesProtos = master.getServerManager().getOnlineServersList() + .stream().map(serverName -> ProtobufUtil + .toServerName(serverName)).collect(Collectors.toList()); + } + + for (HBaseProtos.ServerName serverName : serverNamesProtos) { + CompactionResponseProto.Builder builder = CompactionResponseProto.newBuilder(); + builder.setServerName(serverName).addAllCompaction(master + .getCompactionService().getCompactionProtoList(ProtobufUtil.toServerName(serverName))); + response.addCompactionResponse(builder.build()); + } + } catch (IOException e) { + throw new ServiceException(e); + } + return response.build(); + } + /** * Compact a region on the master. * diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 7b0c56a924ee6ad797a973557161d25bf0eb4486..44a5a37e4bf34e6327784dce77116014f98f5fcc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.compaction.CompactionService; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -144,6 +145,11 @@ public interface MasterServices extends Server { */ MetricsMaster getMasterMetrics(); + /** + * @return Master's instance of {@link CompactionService} + */ + CompactionService getCompactionService(); + /** * Check table is modifiable; i.e. exists and is offline. * @param tableName Name of table to check. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionService.java new file mode 100644 index 0000000000000000000000000000000000000000..526783ff4e20ff7d7d06c2a80eabb832ac231ac4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionService.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.compaction; + + +import java.util.List; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Service; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.CompactionProto; + +/** + * CompactionService receives and serves compaction information and handle compaction requests + * made by regionservers + */ +@InterfaceAudience.Private +public interface CompactionService extends Service { + // Using proto instead of converting them to POJO, inorder to avoid conversion when + // sending information to clients + void setCompactionProtoList(ServerName serverName, List compactionProtoList); + + List getCompactionProtoList(ServerName serverName); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1cf7ba1b7afbbf3454bd5781730cce3949afb1af --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/compaction/CompactionServiceImpl.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.compaction; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Compaction; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AbstractService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.CompactionProto; + +/** + * Impl for exposing Compaction information and decision making. + */ +@InterfaceAudience.Private +public class CompactionServiceImpl extends AbstractService implements CompactionService { + private static Logger LOG = LoggerFactory.getLogger(CompactionServiceImpl.class); + private final MasterServices masterServices; + private final Map> onlineServersCompactionState = + new ConcurrentHashMap<>(); + private final Map> + onlineServersCompactionProtoState = new ConcurrentHashMap<>(); + + public CompactionServiceImpl(MasterServices masterServices) { + this.masterServices = masterServices; + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + protected void doStop() { + notifyStopped(); + } + + @Override + public List getCompactionProtoList(ServerName serverName) { + boolean isDead = masterServices.getServerManager().getDeadServers().isDeadServer(serverName); + if (isDead) { + onlineServersCompactionState.remove(serverName); + onlineServersCompactionProtoState.remove(serverName); + } + return onlineServersCompactionProtoState.get(serverName) != null ? + onlineServersCompactionProtoState.get(serverName) : Collections.emptyList(); + } + + @Override + public void setCompactionProtoList(ServerName serverName, + List compactionProtos) { + onlineServersCompactionProtoState.put(serverName,compactionProtos); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index fbf73f36ee8c1f10d59eb57bb969ade63a8cc143..5268e7ef7e5c827f99f04189a1a37a2c0e1509d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.hbase.regionserver.Store.NO_PRIORITY; import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER; - import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -36,8 +38,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntSupplier; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Compaction; +import org.apache.hadoop.hbase.client.CompactionState; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; @@ -85,10 +90,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION = "hbase.regionserver.compaction.enabled"; + public static final String LONG_POOL = "long"; + public static final String SHORT_POOL = "short"; + private final HRegionServer server; private final Configuration conf; - private volatile ThreadPoolExecutor longCompactions; - private volatile ThreadPoolExecutor shortCompactions; + private volatile RunningTasksThreadPoolExecutor longCompactions; + private volatile RunningTasksThreadPoolExecutor shortCompactions; private volatile ThreadPoolExecutor splits; private volatile ThroughputController compactionThroughputController; @@ -141,7 +149,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati final String n = Thread.currentThread().getName(); StealJobQueue stealJobQueue = new StealJobQueue(COMPARATOR); - this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60, + this.longCompactions = new RunningTasksThreadPoolExecutor(largeThreads, largeThreads, 60, TimeUnit.SECONDS, stealJobQueue, new ThreadFactory() { @Override @@ -152,7 +160,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati }); this.longCompactions.setRejectedExecutionHandler(new Rejection()); this.longCompactions.prestartAllCoreThreads(); - this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60, + this.shortCompactions = new RunningTasksThreadPoolExecutor(smallThreads, smallThreads, 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(), new ThreadFactory() { @Override @@ -375,7 +383,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati pool = shortCompactions; } pool.execute( - new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); + new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user)); region.incrementCompactionsQueuedCount(); if (LOG.isDebugEnabled()) { String type = (pool == shortCompactions) ? "Small " : "Large "; @@ -536,10 +544,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati } }; - private final class CompactionRunner implements Runnable { + public final class CompactionRunner implements Runnable { private final HStore store; private final HRegion region; - private final CompactionContext compaction; + //It could be null. Common Case : System Compaction. Making it volatile to initialize it + // latter on when compaction context gets created. + private volatile CompactionContext compaction; private final CompactionLifeCycleTracker tracker; private final CompactionCompleteTracker completeTracker; private int queuedPriority; @@ -606,6 +616,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati ThreadPoolExecutor pool = store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions; + compaction = c; // Long compaction pool can process small job // Short compaction pool should not process large job if (this.parent == shortCompactions && pool == longCompactions) { @@ -671,6 +682,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati doCompaction(user); } + public HStore getStore() { + return store; + } + + public HRegion getRegion() { + return region; + } + + public CompactionContext getCompaction() { + return compaction; + } + private String formatStackTrace(Exception ex) { StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw); @@ -824,18 +847,74 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled)); } - /** + /** * @return the longCompactions thread pool executor */ - ThreadPoolExecutor getLongCompactions() { + RunningTasksThreadPoolExecutor getLongCompactions() { return longCompactions; } /** * @return the shortCompactions thread pool executor */ - ThreadPoolExecutor getShortCompactions() { + RunningTasksThreadPoolExecutor getShortCompactions() { return shortCompactions; } + /** + * @return list of compactions in queue or are running. + */ + public List getCompactions() { + List compactions = new ArrayList<>(); + generateCompactions(compactions, getLongCompactions(), LONG_POOL); + generateCompactions(compactions, getShortCompactions(), SHORT_POOL); + return compactions; + } + + private void generateCompactions(List compactions, + RunningTasksThreadPoolExecutor runningTasksThreadPoolExecutor, String queue) { + List running = runningTasksThreadPoolExecutor.getRunning(); + BlockingQueue queued = runningTasksThreadPoolExecutor.getQueue(); + fillInSummary(compactions, queue, running, Compaction.CompactionExecutionState + .RUNNING); + fillInSummary(compactions, queue, queued, Compaction.CompactionExecutionState + .QUEUED); + } + + private void fillInSummary(List compactions, String queue, + Collection compactionRunnables, Compaction.CompactionExecutionState + compactionExecutionState) { + if (!compactionRunnables.isEmpty()) { + for (Runnable runnable : compactionRunnables) { + CompactSplit.CompactionRunner compactionRunner = (CompactSplit.CompactionRunner) runnable; + HStore store = compactionRunner.getStore(); + Compaction.CompactionBuilder compactionBuilder = new Compaction.CompactionBuilder(); + compactionBuilder.setStoreName(store.getColumnFamilyName()). + setRegionInfo(compactionRunner.getRegion().getRegionInfo()). + setCompactionState(CompactionState.NONE). + setCompactionExecutionState(compactionExecutionState). + setPoolType(queue); + + CompactionContext compactionContext = compactionRunner.getCompaction(); + if (compactionContext != null) { + CompactionRequestImpl request = compactionContext.getRequest(); + if (request != null) { + compactionBuilder.setCompactionState(request.isMajor() ? CompactionState.MAJOR : + CompactionState.MINOR). + setFileToCompact(request.getFiles().stream().map(f -> f.getPath().getName()) + .collect(Collectors.toList())); + } + } + + if (store.getCompactionProgress() != null) { + compactionBuilder.setTotalCompactingCells(store.getCompactionProgress(). + getTotalCompactingKVs()). + setCurrentCompactedCells(store.getCompactionProgress().getCurrentCompactedKvs()). + setTotalCompactedSize(store.getCompactionProgress().getTotalCompactedSize()); + } + compactions.add(compactionBuilder.createCompaction()); + } + } + + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 85175be074eb9974b544152e10593fb7ab48fef6..bb85970d9543d375c2fe126655e8fc045b07f749 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.Compaction; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; @@ -1364,6 +1365,7 @@ public class HRegionServer extends HasThread implements } } + serverLoad.addAllCompaction(Compaction.toProtos(compactSplitThread.getCompactions())); return serverLoad.build(); } @@ -1376,6 +1378,8 @@ public class HRegionServer extends HasThread implements return sb.toString(); } + + /** * Wait on regions close. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RunningTasksThreadPoolExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RunningTasksThreadPoolExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..e72aa7c0438adf43a085cbeaae5e2f6b7b626c7b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RunningTasksThreadPoolExecutor.java @@ -0,0 +1,64 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is used to maintain the list of tasks running in thread pool executor + */ +@InterfaceAudience.Private +public class RunningTasksThreadPoolExecutor extends ThreadPoolExecutor { + private static final Logger LOG = LoggerFactory.getLogger(RunningTasksThreadPoolExecutor.class); + private List running = Collections.synchronizedList(new ArrayList()); + + public RunningTasksThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public List getRunning() { + return running; + } + + @Override + protected void beforeExecute(Thread t, Runnable r) { + super.beforeExecute(t, r); + running.add(r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + running.remove(r); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index ac20dbd0e2ba2096f8ca8de66e6658f742ea8cc3..0ee4e81f9605019ff20e1ab1b739abae02514bcc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; +import org.apache.hadoop.hbase.master.compaction.CompactionService; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -145,6 +146,11 @@ public class MockNoopMasterServices implements MasterServices { return metricsMaster; } + @Override + public CompactionService getCompactionService() { + return null; + } + @Override public ServerManager getServerManager() { return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java index 91955f802fd9ac588e3d697fbdbc20ce355d11a8..de833d4f569ce91ed7ae211cb1a0fae09c790b57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterMetrics.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.compaction.CompactionService; +import org.apache.hadoop.hbase.master.compaction.CompactionServiceImpl; import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -34,6 +36,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,6 +68,11 @@ public class TestMasterMetrics { long reportStartTime, long reportEndTime) { // do nothing } + + @Override + public CompactionService getCompactionService() { + return new CompactionServiceImpl(this); + } } @BeforeClass @@ -86,8 +94,8 @@ public class TestMasterMetrics { } @Test - public void testClusterRequests() throws Exception { + public void testClusterRequests() throws Exception { // sending fake request to master to see how metric value has changed RegionServerStatusProtos.RegionServerReportRequest.Builder request = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index a1d76fba3f148728e80516b3da09919698c8ad51..7ae129bd06c24d204d7a6721a007a35d52ef52e2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Compaction; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; @@ -289,6 +290,57 @@ public class TestCompaction { "thrown while completing a corrupt file"); } + @Test + public void testGetCompactions() throws Exception { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // setup a region/store with some files + int numStores = r.getStores().size(); + int totalThreads = thread.getLargeCompactionThreadNum() + thread.getSmallCompactionThreadNum(); + //totalThreads is less than numStores , numStores-totalThreads compactions are in the queue. + assertTrue(totalThreads < numStores); + + CountDownLatch start = new CountDownLatch(totalThreads); + CountDownLatch done = new CountDownLatch(1); + StartEndTracker startEndTracker = new StartEndTracker(start, done); + // create some store files and setup requests for each store on which we want to do a + // compaction + for (HStore store : r.getStores()) { + for (int i = 0; i < compactionThreshold; i++) { + createStoreFile(r, store.getColumnFamilyName()); + } + thread.requestCompaction(r, store, "Test getCompactions()", PRIORITY_USER, + startEndTracker, null); + } + start.await(); + + List compactions = thread.getCompactions(); + + assertEquals(numStores, compactions.size()); + + int running = 0; + int queued = 0; + for (Compaction compaction : compactions) { + assertEquals(compactionThreshold, compaction.getFileToCompact().size()); + if (compaction.getCompactionExecutionState().equals(Compaction.CompactionExecutionState + .QUEUED)) { + (queued)++; + } + else { + (running)++; + } + } + assertEquals(totalThreads, running); + assertEquals(numStores - totalThreads, queued); + + done.countDown(); + thread.interruptIfNecessary(); + } + /** * Create a custom compaction request and be sure that we can track it through the queue, knowing * when the compaction is completed. @@ -786,7 +838,7 @@ public class TestCompaction { * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction * finishes. */ - public static class WaitThroughPutController extends NoLimitThroughputController{ + public static class WaitThroughPutController extends NoLimitThroughputController { public WaitThroughPutController() { } @@ -797,4 +849,31 @@ public class TestCompaction { return 6000000; } } + + /** + * Simple {@link CompactionLifeCycleTracker} on which compacting threads will wait. + */ + public static class StartEndTracker implements CompactionLifeCycleTracker { + + private final CountDownLatch done; + private final CountDownLatch start; + + public StartEndTracker(CountDownLatch start, CountDownLatch done) { + this.start = start; + this.done = done; + } + + @Override + public void beforeExecution(Store store) { + start.countDown(); + } + + @Override + public void afterExecution(Store store) { + try { + done.await(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + } + } } diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb index 75d2de3543d2dac25ccc7e2911cbc8ead03f5428..8b3f4ef49de979677e9e37494ab5bfc4fa119b61 100644 --- a/hbase-shell/src/main/ruby/hbase/admin.rb +++ b/hbase-shell/src/main/ruby/hbase/admin.rb @@ -98,6 +98,19 @@ module Hbase @admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers) end + #---------------------------------------------------------------------------------------------- + # Get compaction status for given regionservers. If passed none, than for all the regionservers. + def compactions(regionserver_names) + region_servers = regionserver_names.flatten.compact + servers = java.util.ArrayList.new + if region_servers.any? + region_servers.each do |s| + servers.add(ServerName.valueOf(s)) + end + end + @admin.getCompactions(servers) + end + #---------------------------------------------------------------------------------------------- # Gets compaction state for specified table def getCompactionState(table_name) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 1785928dec0b1a80f91573fe56ebff58ea89f1c0..f759681a408b4c0688466875debf8c7bb3cb30bc 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -351,6 +351,7 @@ Shell.load_command_group( cleaner_chore_enabled compact_rs compaction_state + compactions trace splitormerge_switch splitormerge_enabled diff --git a/hbase-shell/src/main/ruby/shell/commands/compactions.rb b/hbase-shell/src/main/ruby/shell/commands/compactions.rb new file mode 100644 index 0000000000000000000000000000000000000000..516e1a1cb89d4d827a50ebd006a17fbb0addcebf --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/compactions.rb @@ -0,0 +1,55 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + # compaction summary for a cluster + class Compactions < Command + def help + <<-EOF + Examples: + Get the details of all the compactions either running or queued. Examples: + From the specified regionservers. + hbase> compactions 'server2','server1' + From all regionservers. + hbase> compactions + EOF + end + + def command(*server) + formatter.header(['SERVER', 'COMPACTION_STATE', 'COMPACTION_TYPE', 'REGION', 'CF', + 'FILES', 'TOTAL_COMPACTION_KVs', 'CURRENT_COMPACTED_KVs', + 'TOTAL_COMPACTED_SIZE']) + compactions = admin.compactions(server) + compactions.each do |rs_server, list_of_compactions| + list_of_compactions.each do |v| + formatter.row([rs_server.getServerName, java.lang.String.valueOf(v + .getCompactionExecutionState), java.lang.String.valueOf(v + .getCompactionState), v.getRegionInfo.getRegionNameAsString, v + .getStoreName, java.lang.String.valueOf(v.getFileToCompact), java.lang + .String.valueOf(v.getTotalCompactingCells), java.lang.String.valueOf(v + .getCurrentCompactedCells), java.lang.String.valueOf(v + .getTotalCompactedSize)]) + end + end + formatter.footer(compactions.size) + end + end + end +end -- 2.15.2 (Apple Git-101.1)