.../apache/hadoop/hbase/executor/EventType.java | 9 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../hadoop/hbase/executor/ExecutorService.java | 4 +- .../hadoop/hbase/regionserver/HRegionServer.java | 5 +- .../apache/hadoop/hbase/regionserver/HStore.java | 124 ++++++++------------- .../CompactedHFilesDischargeHandler.java | 50 +++++++++ .../compactions/CompactionConfiguration.java | 3 + .../regionserver/TestHRegionReplayEvents.java | 11 +- .../hbase/regionserver/TestRegionReplicas.java | 1 + 9 files changed, 128 insertions(+), 82 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ac76edb..a7759c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -265,7 +265,14 @@ public enum EventType { * * RS_REGION_REPLICA_FLUSH */ - RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS); + RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS), + + /** + * RS compacted files discharger
+ * + * RS_COMPACTED_FILES_DISCHARGER + */ + RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER); private final int code; private final ExecutorType executor; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index d0f6bee..5a16149 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -46,7 +46,8 @@ public enum ExecutorType { RS_CLOSE_META (25), RS_PARALLEL_SEEK (26), RS_LOG_REPLAY_OPS (27), - RS_REGION_REPLICA_FLUSH_OPS (28); + RS_REGION_REPLICA_FLUSH_OPS (28), + RS_COMPACTED_FILES_DISCHARGER (29); ExecutorType(int value) {} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java index 410fb39..335b672 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java @@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.ThreadMonitoring; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -78,7 +79,8 @@ public class ExecutorService { * started with the same name, this throws a RuntimeException. * @param name Name of the service to start. */ - void startExecutorService(String name, int maxThreads) { + @VisibleForTesting + public void startExecutorService(String name, int maxThreads) { if (this.executorMap.get(name) != null) { throw new RuntimeException("An executor service with the name " + name + " is already running!"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 1cd54fa..8ad6432 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -133,6 +133,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Repor import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; @@ -1716,7 +1717,9 @@ public class HRegionServer extends HasThread implements } this.service.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); - + // Start the threads for compacted files discharger + this.service.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, + conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) { this.service.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS, conf.getInt("hbase.regionserver.region.replica.flusher.threads", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index badbd65..27efcf1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -37,9 +37,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -56,12 +54,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; @@ -75,7 +75,7 @@ import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactedHFilesDischargeHandler; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -90,7 +90,6 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; @@ -138,8 +137,6 @@ public class HStore implements Store { static int closeCheckInterval = 0; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; - private ThreadPoolExecutor compactionCleanerthreadPoolExecutor = null; - private CompletionService completionService = null; /** * RWLock for store operations. @@ -273,10 +270,6 @@ public class HStore implements Store { "hbase.hstore.flush.retries.number must be > 0, not " + flushRetriesNumber); } - compactionCleanerthreadPoolExecutor = getThreadPoolExecutor( - conf.getInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 10)); - completionService = - new ExecutorCompletionService(compactionCleanerthreadPoolExecutor); cryptoContext = EncryptionUtil.createEncryptionContext(conf, family); } @@ -801,7 +794,7 @@ public class HStore implements Store { Collection compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); // clear the compacted files - removeCompactedFiles(compactedfiles); + processCompactedfiles(compactedfiles); if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. ThreadPoolExecutor storeFileCloserThreadPool = this.region @@ -843,9 +836,6 @@ public class HStore implements Store { } if (ioe != null) throw ioe; } - if (compactionCleanerthreadPoolExecutor != null) { - compactionCleanerthreadPoolExecutor.shutdownNow(); - } LOG.info("Closed " + this); return result; } finally { @@ -2177,7 +2167,7 @@ public class HStore implements Store { } public static final long FIXED_OVERHEAD = - ClassSize.align(ClassSize.OBJECT + (18 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + ClassSize.align(ClassSize.OBJECT + (16 * ClassSize.REFERENCE) + (10 * Bytes.SIZEOF_LONG) + (5 * Bytes.SIZEOF_INT) + (2 * Bytes.SIZEOF_BOOLEAN)); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD @@ -2314,80 +2304,60 @@ public class HStore implements Store { } finally { lock.readLock().unlock(); } - removeCompactedFiles(copyCompactedfiles); - } - - private ThreadPoolExecutor getThreadPoolExecutor(int maxThreads) { - return Threads.getBoundedCachedThreadPool(maxThreads, maxThreads * 3, TimeUnit.SECONDS, - new ThreadFactory() { - private int count = 1; - - @Override - public Thread newThread(Runnable r) { - return new Thread(r, "CompactedfilesArchiver-" + count++); - } - }); + processCompactedfiles(copyCompactedfiles); } - private void removeCompactedFiles(Collection compactedfiles) throws IOException { + private void processCompactedfiles(Collection compactedfiles) throws IOException { if (compactedfiles != null && !compactedfiles.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Removing the compacted store files " + compactedfiles); + if (this.getHRegion().getRegionServerServices() != null) { + CompactedHFilesDischargeHandler handler = new CompactedHFilesDischargeHandler( + (Server) this.getHRegion().getRegionServerServices(), + EventType.RS_COMPACTED_FILES_DISCHARGER, this, compactedfiles); + this.getHRegion().getRegionServerServices().getExecutorService().submit(handler); + } else { + archiveAndRemoveCompactedFiles(compactedfiles); } - for (final StoreFile file : compactedfiles) { - completionService.submit(new Callable() { - @Override - public StoreFile call() throws IOException { - synchronized (file) { - try { - StoreFile.Reader r = file.getReader(); - if (r == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("The file " + file + " was closed but still not archived."); - } - return file; - } - if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { - // Even if deleting fails we need not bother as any new scanners won't be - // able to use the compacted file as the status is already compactedAway - if (LOG.isTraceEnabled()) { - LOG.trace("Closing and archiving the file " + file.getPath()); - } - r.close(true); - // Just close and return - return file; - } - } catch (Exception e) { - LOG.error("Exception while trying to close the compacted store file " - + file.getPath().getName()); - } + } + } + + public void archiveAndRemoveCompactedFiles(Collection compactedfiles) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Removing the compacted store files " + compactedfiles); + } + final List filesToRemove = new ArrayList(compactedfiles.size()); + for (final StoreFile file : compactedfiles) { + synchronized (file) { + try { + StoreFile.Reader r = file.getReader(); + if (r == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("The file " + file + " was closed but still not archived."); } - return null; + filesToRemove.add(file); } - }); - } - final List filesToRemove = new ArrayList(compactedfiles.size()); - try { - for (final StoreFile file : compactedfiles) { - Future future = completionService.take(); - StoreFile closedFile = future.get(); - if (closedFile != null) { - filesToRemove.add(closedFile); + if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) { + // Even if deleting fails we need not bother as any new scanners won't be + // able to use the compacted file as the status is already compactedAway + if (LOG.isTraceEnabled()) { + LOG.trace("Closing and archiving the file " + file.getPath()); + } + r.close(true); + // Just close and return + filesToRemove.add(file); } + } catch (Exception e) { + LOG.error( + "Exception while trying to close the compacted store file " + file.getPath().getName()); } - } catch (InterruptedException ie) { - LOG.error("Interrupted exception while closing the compacted files", ie); - } catch (Exception e) { - LOG.error("Exception occured while closing the compacted files", e); } - if (isPrimaryReplicaStore()) { - archiveAndRemoveCompactedFiles(filesToRemove); - } - + } + if (this.isPrimaryReplicaStore()) { + removeCompactedfiles(filesToRemove); } } - private void archiveAndRemoveCompactedFiles(List filesToArchive) throws IOException { + private void removeCompactedfiles(List filesToArchive) throws IOException { if (!filesToArchive.isEmpty()) { if (LOG.isTraceEnabled()) { LOG.trace("Moving the files " + filesToArchive + " to archive"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischargeHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischargeHandler.java new file mode 100644 index 0000000..005eb21 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactedHFilesDischargeHandler.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; + +/** + * Event handler that handles the removal and archival of the compacted hfiles + */ +@InterfaceAudience.Private +public class CompactedHFilesDischargeHandler extends EventHandler { + + private Collection compactedfiles; + private HStore store; + + public CompactedHFilesDischargeHandler(Server server, EventType eventType, HStore store, + Collection compactedfiles) { + super(server, eventType); + this.compactedfiles = compactedfiles; + this.store = store; + } + + @Override + public void process() throws IOException { + this.store.archiveAndRemoveCompactedFiles(compactedfiles); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java index 62e7c7c..6ea4b67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java @@ -64,6 +64,9 @@ public class CompactionConfiguration { public static final String HBASE_HSTORE_MIN_LOCALITY_TO_SKIP_MAJOR_COMPACT = "hbase.hstore.min.locality.to.skip.major.compact"; + public static final String HBASE_HSTORE_COMPACTION_DISCHARGER_THREAD_COUNT = + "hbase.hstore.compaction.discharger.thread.count"; + Configuration conf; StoreConfigInformation storeConfigInfo; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index c59d6f7..2c62685 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -167,7 +168,11 @@ public class TestHRegionReplayEvents { when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); when(rss.getConfiguration()).thenReturn(CONF); when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting()); - + String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER.toString(); + ExecutorService es = new ExecutorService(string); + es.startExecutorService( + string+"-"+string, 1); + when(rss.getExecutorService()).thenReturn(es); primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary); primaryRegion.close(); @@ -1372,6 +1377,10 @@ public class TestHRegionReplayEvents { primaryRegion.compactStores(); CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, primaryRegion); cleaner.chore(); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } secondaryRegion.refreshStoreFiles(); assertPathListsEqual(primaryRegion.getStoreFileList(families), secondaryRegion.getStoreFileList(families)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java index 67258aa..a790513 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicas.java @@ -457,6 +457,7 @@ public class TestRegionReplicas { CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null, (HRegion) primaryRegion); cleaner.chore(); + Thread.sleep(2000); // scan all the hfiles on the secondary. // since there are no read on the secondary when we ask locations to // the NN a FileNotFound exception will be returned and the FileLink