.../apache/hadoop/hbase/regionserver/HStore.java | 112 +++++++++++++-------- .../compactions/TestCompactedHFilesDischarger.java | 63 +++++++++++- 2 files changed, 132 insertions(+), 43 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index ce5c91d..984d3e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -18,23 +18,14 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.NavigableSet; import java.util.Set; @@ -62,9 +53,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagType; -import org.apache.hadoop.hbase.TagUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.conf.ConfigurationManager; @@ -79,8 +67,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.io.hfile.InvalidHFileException; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -91,6 +77,8 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -99,6 +87,13 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -188,6 +183,8 @@ public class HStore implements Store { private volatile long flushedOutputFileSize = 0; private volatile long compactedCellsSize = 0; private volatile long majorCompactedCellsSize = 0; + // boolean that controls multiple threads in executing removal of the compacted files + private AtomicBoolean removalInProgress = new AtomicBoolean(false); /** * Constructor @@ -784,15 +781,14 @@ public class HStore implements Store { @Override public ImmutableCollection close() throws IOException { - this.lock.writeLock().lock(); try { - // Clear so metrics doesn't find them. - ImmutableCollection result = storeEngine.getStoreFileManager().clearFiles(); - Collection compactedfiles = - storeEngine.getStoreFileManager().clearCompactedFiles(); - // clear the compacted files - if (compactedfiles != null && !compactedfiles.isEmpty()) { - removeCompactedfiles(compactedfiles); + ImmutableCollection result = null; + // the write lock will be taken inside this + try { + result = removeCompactedfiles(null, true); + } catch (InterruptedException ie) { + // won't happen + throw new IOException(ie); } if (!result.isEmpty()) { // initialize the thread pool for closing store files in parallel. @@ -2366,20 +2362,46 @@ public class HStore implements Store { lock.readLock().unlock(); } if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) { - removeCompactedfiles(copyCompactedfiles); + try { + removeCompactedfiles(copyCompactedfiles, false); + } catch (InterruptedException e) { + // wont't happen + throw new IOException(e); + } } } /** * Archives and removes the compacted files * @param compactedfiles The compacted files in this store that are not active in reads + * @param getWriteLock true indicates that inside this method we need to obtain the write lock * @throws IOException + * @throws InterruptedException + * @return Returns the set of cleared store files in case of {@link #close()} */ - private void removeCompactedfiles(Collection compactedfiles) - throws IOException { - final List filesToRemove = new ArrayList(compactedfiles.size()); - for (final StoreFile file : compactedfiles) { - synchronized (file) { + private ImmutableCollection removeCompactedfiles(Collection compactedfiles, boolean getWriteLock) + throws IOException, InterruptedException { + ImmutableCollection result = null; + try { + boolean success = removalInProgress.compareAndSet(false, true); + if (!success) { + synchronized (removalInProgress) { + // wait till the removal is done + removalInProgress.wait(); + } + } + if (getWriteLock && compactedfiles == null) { + // get the write lock only after we are sure that the clearCompactedfiles has happened + this.lock.writeLock().lock(); + // Clear so metrics doesn't find them. + result = storeEngine.getStoreFileManager().clearFiles(); + compactedfiles = storeEngine.getStoreFileManager().clearCompactedFiles(); + if (compactedfiles == null || compactedfiles.isEmpty()) { + return result; + } + } + final List filesToRemove = new ArrayList(compactedfiles.size()); + for (final StoreFile file : compactedfiles) { try { StoreFileReader r = file.getReader(); if (r == null) { @@ -2403,23 +2425,29 @@ public class HStore implements Store { "Exception while trying to close the compacted store file " + file.getPath().getName()); } } - } - if (this.isPrimaryReplicaStore()) { - // Only the primary region is allowed to move the file to archive. - // The secondary region does not move the files to archive. Any active reads from - // the secondary region will still work because the file as such has active readers on it. - if (!filesToRemove.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Moving the files " + filesToRemove + " to archive"); + if (this.isPrimaryReplicaStore()) { + // Only the primary region is allowed to move the file to archive. + // The secondary region does not move the files to archive. Any active reads from + // the secondary region will still work because the file as such has active readers on it. + if (!filesToRemove.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Moving the files " + filesToRemove + " to archive"); + } + // Only if this is successful it has to be removed + this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove); } - // Only if this is successful it has to be removed - this.fs.removeStoreFiles(this.getFamily().getNameAsString(), filesToRemove); + } + if (!filesToRemove.isEmpty()) { + // Clear the compactedfiles from the store file manager + clearCompactedfiles(filesToRemove); + } + } finally { + removalInProgress.compareAndSet(true, false); + synchronized (removalInProgress) { + removalInProgress.notify(); } } - if (!filesToRemove.isEmpty()) { - // Clear the compactedfiles from the store file manager - clearCompactedfiles(filesToRemove); - } + return result; } @Override public void finalizeFlush() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java index c23e794..fe9f73d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactedHFilesDischarger.java @@ -229,7 +229,7 @@ public class TestCompactedHFilesDischarger { } @Test - public void testCleanerWithParallelScanners() throws Exception { + public void testStoreCloseAndDischargeRunningInParallel() throws Exception { // Create the cleaner object CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); @@ -260,6 +260,67 @@ public class TestCompactedHFilesDischarger { assertEquals(3, store.getStorefilesCount()); Collection storefiles = store.getStorefiles(); + // None of the files should be in compacted state. + for (StoreFile file : storefiles) { + assertFalse(file.isCompactedAway()); + } + // Do compaction + region.compact(true); + + // now run the cleaner + Thread thread = new Thread() { + public void run() { + cleaner.chore(); + } + }; + thread.start(); + Thread thread2 = new Thread() { + public void run() { + // wait for the chore to complete and call close + try { + ((HRegion) region).close(); + } catch (IOException e) { + } + } + }; + thread2.start(); + // no error should occur after the execution of the test + thread.join(); + thread2.join(); + } + + @Test + public void testCleanerWithParallelScanners() throws Exception { + // Create the cleaner object + CompactedHFilesDischarger cleaner = + new CompactedHFilesDischarger(1000, (Stoppable) null, rss, false); + // Add some data to the region and do some flushes + for (int i = 1; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 11; i < 20; i++) { + Put p = new Put(Bytes.toBytes("row" + i));; + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + for (int i = 21; i < 30; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(fam, qual1, val); + region.put(p); + } + // flush them + region.flush(true); + + Store store = region.getStore(fam); + assertEquals(3, store.getStorefilesCount()); + + Collection storefiles = store.getStorefiles(); Collection compactedfiles = ((HStore) store).getStoreEngine().getStoreFileManager().getCompactedfiles(); // None of the files should be in compacted state.