.../apache/hadoop/hbase/executor/EventType.java | 9 +-
.../apache/hadoop/hbase/executor/ExecutorType.java | 3 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 5 +-
.../apache/hadoop/hbase/regionserver/HStore.java | 122 ++++++++-------------
.../CompactedHFilesDischargeHandler.java | 48 ++++++++
.../compactions/CompactionConfiguration.java | 3 +
6 files changed, 111 insertions(+), 79 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
index ac76edb..a7759c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java
@@ -265,7 +265,14 @@ public enum EventType {
*
* RS_REGION_REPLICA_FLUSH
*/
- RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS);
+ RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS),
+
+ /**
+ * RS compacted files discharger
+ *
+ * RS_COMPACTED_FILES_DISCHARGER
+ */
+ RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER);
private final int code;
private final ExecutorType executor;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
index d0f6bee..5a16149 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java
@@ -46,7 +46,8 @@ public enum ExecutorType {
RS_CLOSE_META (25),
RS_PARALLEL_SEEK (26),
RS_LOG_REPLAY_OPS (27),
- RS_REGION_REPLICA_FLUSH_OPS (28);
+ RS_REGION_REPLICA_FLUSH_OPS (28),
+ RS_COMPACTED_FILES_DISCHARGER (29);
ExecutorType(int value) {}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 211fed5..eeab633 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -1716,7 +1717,9 @@ public class HRegionServer extends HasThread implements
}
this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
"hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS));
-
+ // Start the threads for compacted files discharger
+ this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
+ conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index badbd65..e37ae47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -56,12 +54,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Encryption;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@@ -75,7 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischargeHandler;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
@@ -138,8 +137,6 @@ public class HStore implements Store {
static int closeCheckInterval = 0;
private volatile long storeSize = 0L;
private volatile long totalUncompressedBytes = 0L;
- private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null;
- private CompletionService completionService = null;
/**
* RWLock for store operations.
@@ -273,10 +270,6 @@ public class HStore implements Store {
"hbase.hstore.flush.retries.number must be > 0, not "
+ flushRetriesNumber);
}
- compactionCleanerthreadPoolExecutor = getThreadPoolExecutor(
- conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10));
- completionService =
- new ExecutorCompletionService(compactionCleanerthreadPoolExecutor);
cryptoContext = EncryptionUtil.createEncryptionContext(conf, family);
}
@@ -801,7 +794,7 @@ public class HStore implements Store {
Collection compactedfiles =
storeEngine.getStoreFileManager().clearCompactedFiles();
// clear the compacted files
- removeCompactedFiles(compactedfiles);
+ processCompactedfiles(compactedfiles);
if (!result.isEmpty()) {
// initialize the thread pool for closing store files in parallel.
ThreadPoolExecutor storeFileCloserThreadPool = this.region
@@ -843,9 +836,6 @@ public class HStore implements Store {
}
if (ioe != null) throw ioe;
}
- if (compactionCleanerthreadPoolExecutor != null) {
- compactionCleanerthreadPoolExecutor.shutdownNow();
- }
LOG.info("Closed " + this);
return result;
} finally {
@@ -2314,80 +2304,60 @@ public class HStore implements Store {
} finally {
lock.readLock().unlock();
}
- removeCompactedFiles(copyCompactedfiles);
- }
-
- private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) {
- return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "CompactedfilesArchiver-" + count++);
- }
- });
+ processCompactedfiles(copyCompactedfiles);
}
- private void removeCompactedFiles(Collection compactedfiles) throws IOException {
+ private void processCompactedfiles(Collection compactedfiles) throws IOException {
if (compactedfiles != null && !compactedfiles.isEmpty()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Removing the compacted store files " + compactedfiles);
+ if(this.getHRegion().getRegionServerServices() != null) {
+ CompactedHFilesDischargeHandler handler =
+ new CompactedHFilesDischargeHandler((Server) this.getHRegion().getRegionServerServices(),
+ EventType.RS_COMPACTED_FILES_DISCHARGER, this, compactedfiles);
+ this.getHRegion().getRegionServerServices().getExecutorService().submit(handler);
+ } else {
+ archiveAndRemoveCompactedFiles(compactedfiles);
}
- for (final StoreFile file : compactedfiles) {
- completionService.submit(new Callable() {
- @Override
- public StoreFile call() throws IOException {
- synchronized (file) {
- try {
- StoreFile.Reader r = file.getReader();
- if (r == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The file " + file + " was closed but still not archived.");
- }
- return file;
- }
- if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
- // Even if deleting fails we need not bother as any new scanners won't be
- // able to use the compacted file as the status is already compactedAway
- if (LOG.isTraceEnabled()) {
- LOG.trace("Closing and archiving the file " + file.getPath());
- }
- r.close(true);
- // Just close and return
- return file;
- }
- } catch (Exception e) {
- LOG.error("Exception while trying to close the compacted store file "
- + file.getPath().getName());
- }
+ }
+ }
+
+ public void archiveAndRemoveCompactedFiles(Collection compactedfiles)
+ throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing the compacted store files " + compactedfiles);
+ }
+ final List filesToRemove = new ArrayList(compactedfiles.size());
+ for (final StoreFile file : compactedfiles) {
+ synchronized (file) {
+ try {
+ StoreFile.Reader r = file.getReader();
+ if (r == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The file " + file + " was closed but still not archived.");
}
- return null;
+ filesToRemove.add(file);
}
- });
- }
- final List filesToRemove = new ArrayList(compactedfiles.size());
- try {
- for (final StoreFile file : compactedfiles) {
- Future future = completionService.take();
- StoreFile closedFile = future.get();
- if (closedFile != null) {
- filesToRemove.add(closedFile);
+ if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
+ // Even if deleting fails we need not bother as any new scanners won't be
+ // able to use the compacted file as the status is already compactedAway
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Closing and archiving the file " + file.getPath());
+ }
+ r.close(true);
+ // Just close and return
+ filesToRemove.add(file);
}
+ } catch (Exception e) {
+ LOG.error(
+ "Exception while trying to close the compacted store file " + file.getPath().getName());
}
- } catch (InterruptedException ie) {
- LOG.error("Interrupted exception while closing the compacted files", ie);
- } catch (Exception e) {
- LOG.error("Exception occured while closing the compacted files", e);
}
- if (isPrimaryReplicaStore()) {
- archiveAndRemoveCompactedFiles(filesToRemove);
- }
-
+ }
+ if (this.isPrimaryReplicaStore()) {
+ removeCompactedfiles(filesToRemove);
}
}
- private void archiveAndRemoveCompactedFiles(List filesToArchive) throws IOException {
+ private void removeCompactedfiles(List filesToArchive) throws IOException {
if (!filesToArchive.isEmpty()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Moving the files " + filesToArchive + " to archive");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischargeHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischargeHandler.java
new file mode 100644
index 0000000..4f91760
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischargeHandler.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+/**
+ * Event handler that handles the removal and archival of the compacted hfiles
+ */
+public class CompactedHFilesDischargeHandler extends EventHandler {
+
+ private Collection compactedfiles;
+ private HStore store;
+
+ public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store,
+ Collection compactedfiles) {
+ super(server, eventType);
+ this.compactedfiles = compactedfiles;
+ this.store = store;
+ }
+
+ @Override
+ public void process() throws IOException {
+ this.store.archiveAndRemoveCompactedFiles(compactedfiles);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
index 62e7c7c..6ea4b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
@@ -64,6 +64,9 @@ public class CompactionConfiguration {
public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT =
"hbase.hstore.min.locality.to.skip.major.compact";
+ public static final String HBASE_HSTORE_COMPACTION_DISCHARGER_THREAD_COUNT =
+ "hbase.hstore.compaction.discharger.thread.count";
+
Configuration conf;
StoreConfigInformation storeConfigInfo;