Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java (revision 1456769) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java (working copy) @@ -32,6 +32,14 @@ @Category(SmallTests.class) public class TestDefaultStoreEngine { + private static final long CACHE_FLUSH_ID = 0L; + + public static class DummyStoreFlusher extends DefaultStoreFlusher { + public DummyStoreFlusher(Long cacheFlushId, Configuration conf, Store store) { + super(cacheFlushId, conf, store); + } + } + public static class DummyCompactor extends DefaultCompactor { public DummyCompactor(Configuration conf, Store store) { super(conf, store); @@ -43,6 +51,16 @@ super(conf, storeConfigInfo); } } + + @Test + public void testCustomStoreFlusher() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, DummyStoreFlusher.class.getName()); + Store mockStore = Mockito.mock(Store.class); + StoreEngine se = StoreEngine.create(mockStore, conf, new KVComparator()); + Assert.assertTrue(se instanceof DefaultStoreEngine); + Assert.assertTrue(se.createStoreFlusher(CACHE_FLUSH_ID) instanceof DummyStoreFlusher); + } @Test public void testCustomPolicyAndCompactor() throws Exception { @@ -51,7 +69,7 @@ conf.set(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY, DummyCompactionPolicy.class.getName()); Store mockStore = Mockito.mock(Store.class); - StoreEngine se = StoreEngine.create(mockStore, conf, new KVComparator()); + StoreEngine se = StoreEngine.create(mockStore, conf, new KVComparator()); Assert.assertTrue(se instanceof DefaultStoreEngine); Assert.assertTrue(se.getCompactionPolicy() instanceof DummyCompactionPolicy); Assert.assertTrue(se.getCompactor() instanceof DummyCompactor); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (revision 1456769) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (working copy) @@ -28,10 +28,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.SortedSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -61,14 +59,14 @@ import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; +import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -548,7 +546,27 @@ final Result result1b = region2.get(g); assertEquals(result.size(), result1b.size()); } + + // StoreFlusher implementation used in testReplayEditsAfterAbortingFlush. + // Only throws exception if throwExceptionWhenFlushing is set true. + public static class CustomStoreFlusher extends DefaultStoreFlusher { + // Switch between throw and not throw exception in flush + static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); + + public CustomStoreFlusher(Long cacheFlushId, Configuration conf, + Store store) { + super(cacheFlushId, conf, store); + } + @Override + public void flushCache(MonitoredTask status) throws IOException { + if (throwExceptionWhenFlushing.get()) { + throw new IOException("Simulated exception by tests"); + } + super.flushCache(status); + } + }; + /** * Test that we could recover the data correctly after aborting flush. In the * test, first we abort flush after writing some data, then writing more data @@ -569,28 +587,12 @@ // of the families during the load of edits so its seqid is not same as // others to test we do right thing when different seqids. HLog wal = createWAL(this.conf); - final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); RegionServerServices rsServices = Mockito.mock(RegionServerServices.class); Mockito.doReturn(false).when(rsServices).isAborted(); - HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd, - rsServices) { - @Override - protected HStore instantiateHStore(final HColumnDescriptor family) throws IOException { - return new HStore(this, family, conf) { - @Override - protected Path flushCache(final long logCacheFlushId, - SortedSet snapshot, - TimeRangeTracker snapshotTimeRangeTracker, - AtomicLong flushedSize, MonitoredTask status) throws IOException { - if (throwExceptionWhenFlushing.get()) { - throw new IOException("Simulated exception by tests"); - } - return super.flushCache(logCacheFlushId, snapshot, - snapshotTimeRangeTracker, flushedSize, status); - } - }; - } - }; + Configuration customConf = new Configuration(this.conf); + customConf.set(DefaultStoreEngine.DEFAULT_STORE_FLUSHER_CLASS_KEY, CustomStoreFlusher.class.getName()); + HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, + rsServices); long seqid = region.initialize(); // HRegionServer usually does this. It knows the largest seqid across all // regions. @@ -611,7 +613,7 @@ assertEquals(writtenRowCount, getScannedCount(scanner)); // Let us flush the region - throwExceptionWhenFlushing.set(true); + CustomStoreFlusher.throwExceptionWhenFlushing.set(true); try { region.flushcache(); fail("Injected exception hasn't been thrown"); @@ -631,7 +633,7 @@ } writtenRowCount += moreRow; // call flush again - throwExceptionWhenFlushing.set(false); + CustomStoreFlusher.throwExceptionWhenFlushing.set(false); try { region.flushcache(); } catch (IOException t) { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (revision 1456769) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (working copy) @@ -122,7 +122,6 @@ static int closeCheckInterval = 0; private volatile long storeSize = 0L; private volatile long totalUncompressedBytes = 0L; - private final Object flushLock = new Object(); final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final boolean verifyBulkLoads; @@ -144,14 +143,10 @@ // Comparing KeyValues private final KeyValue.KVComparator comparator; - final StoreEngine storeEngine; + final StoreEngine storeEngine; private OffPeakCompactions offPeakCompactions; - private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; - private static int flush_retries_number; - private static int pauseTime; - /** * Constructor * @param region @@ -215,18 +210,6 @@ this.checksumType = getChecksumType(conf); // initilize bytes per checksum this.bytesPerChecksum = getBytesPerChecksum(conf); - // Create a compaction manager. - if (HStore.flush_retries_number == 0) { - HStore.flush_retries_number = conf.getInt( - "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); - HStore.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, - HConstants.DEFAULT_HBASE_SERVER_PAUSE); - if (HStore.flush_retries_number <= 0) { - throw new IllegalArgumentException( - "hbase.hstore.flush.retries.number must be > 0, not " - + HStore.flush_retries_number); - } - } } /** @@ -629,179 +612,13 @@ this.memstore.snapshot(); } - /** - * Write out current snapshot. Presumes {@link #snapshot()} has been called - * previously. - * @param logCacheFlushId flush sequence number - * @param snapshot - * @param snapshotTimeRangeTracker - * @param flushedSize The number of bytes flushed - * @param status - * @return Path The path name of the tmp file to which the store was flushed - * @throws IOException - */ - protected Path flushCache(final long logCacheFlushId, - SortedSet snapshot, - TimeRangeTracker snapshotTimeRangeTracker, - AtomicLong flushedSize, - MonitoredTask status) throws IOException { - // If an exception happens flushing, we let it out without clearing - // the memstore snapshot. The old snapshot will be returned when we say - // 'snapshot', the next time flush comes around. - // Retry after catching exception when flushing, otherwise server will abort - // itself - IOException lastException = null; - for (int i = 0; i < HStore.flush_retries_number; i++) { - try { - Path pathName = internalFlushCache(snapshot, logCacheFlushId, - snapshotTimeRangeTracker, flushedSize, status); - try { - // Path name is null if there is no entry to flush - if (pathName != null) { - validateStoreFile(pathName); - } - return pathName; - } catch (Exception e) { - LOG.warn("Failed validating store file " + pathName - + ", retring num=" + i, e); - if (e instanceof IOException) { - lastException = (IOException) e; - } else { - lastException = new IOException(e); - } - } - } catch (IOException e) { - LOG.warn("Failed flushing store file, retring num=" + i, e); - lastException = e; - } - if (lastException != null) { - try { - Thread.sleep(pauseTime); - } catch (InterruptedException e) { - IOException iie = new InterruptedIOException(); - iie.initCause(e); - throw iie; - } - } - } - throw lastException; - } - - /* - * @param cache - * @param logCacheFlushId - * @param snapshotTimeRangeTracker - * @param flushedSize The number of bytes flushed - * @return Path The path name of the tmp file to which the store was flushed - * @throws IOException - */ - private Path internalFlushCache(final SortedSet set, + @Override + public StoreFile commitStoreFile(final Path path, final long logCacheFlushId, TimeRangeTracker snapshotTimeRangeTracker, AtomicLong flushedSize, MonitoredTask status) throws IOException { - StoreFile.Writer writer; - // Find the smallest read point across all the Scanners. - long smallestReadPoint = region.getSmallestReadPoint(); - long flushed = 0; - Path pathName; - // Don't flush if there are no entries. - if (set.size() == 0) { - return null; - } - // Use a store scanner to find which rows to flush. - // Note that we need to retain deletes, hence - // treat this as a minor compaction. - InternalScanner scanner = null; - KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator); - if (this.getCoprocessorHost() != null) { - scanner = this.getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner); - } - if (scanner == null) { - Scan scan = new Scan(); - scan.setMaxVersions(scanInfo.getMaxVersions()); - scanner = new StoreScanner(this, scanInfo, scan, - Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, - smallestReadPoint, HConstants.OLDEST_TIMESTAMP); - } - if (this.getCoprocessorHost() != null) { - InternalScanner cpScanner = - this.getCoprocessorHost().preFlush(this, scanner); - // NULL scanner returned from coprocessor hooks means skip normal processing - if (cpScanner == null) { - return null; - } - scanner = cpScanner; - } - try { - int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); - // TODO: We can fail in the below block before we complete adding this - // flush to list of store files. Add cleanup of anything put on filesystem - // if we fail. - synchronized (flushLock) { - status.setStatus("Flushing " + this + ": creating writer"); - // A. Write the map out to the disk - writer = createWriterInTmp(set.size()); - writer.setTimeRangeTracker(snapshotTimeRangeTracker); - pathName = writer.getPath(); - try { - List kvs = new ArrayList(); - boolean hasMore; - do { - hasMore = scanner.next(kvs, compactionKVMax); - if (!kvs.isEmpty()) { - for (KeyValue kv : kvs) { - // If we know that this KV is going to be included always, then let us - // set its memstoreTS to 0. This will help us save space when writing to - // disk. - if (kv.getMemstoreTS() <= smallestReadPoint) { - // let us not change the original KV. It could be in the memstore - // changing its memstoreTS could affect other threads/scanners. - kv = kv.shallowCopy(); - kv.setMemstoreTS(0); - } - writer.append(kv); - flushed += this.memstore.heapSizeChange(kv, true); - } - kvs.clear(); - } - } while (hasMore); - } finally { - // Write out the log sequence number that corresponds to this output - // hfile. Also write current time in metadata as minFlushTime. - // The hfile is current up to and including logCacheFlushId. - status.setStatus("Flushing " + this + ": appending metadata"); - writer.appendMetadata(logCacheFlushId, false); - status.setStatus("Flushing " + this + ": closing flushed file"); - writer.close(); - } - } - } finally { - flushedSize.set(flushed); - scanner.close(); - } - if (LOG.isInfoEnabled()) { - LOG.info("Flushed " + - ", sequenceid=" + logCacheFlushId + - ", memsize=" + StringUtils.humanReadableInt(flushed) + - ", into tmp file " + pathName); - } - return pathName; - } - - /* - * @param path The pathname of the tmp file into which the store was flushed - * @param logCacheFlushId - * @return StoreFile created. - * @throws IOException - */ - private StoreFile commitFile(final Path path, - final long logCacheFlushId, - TimeRangeTracker snapshotTimeRangeTracker, - AtomicLong flushedSize, - MonitoredTask status) - throws IOException { // Write-out finished successfully, move into the right spot Path dstPath = fs.commitStoreFile(getColumnFamilyName(), path); @@ -823,15 +640,6 @@ /* * @param maxKeyCount - * @return Writer for a new StoreFile in the tmp dir. - */ - private StoreFile.Writer createWriterInTmp(long maxKeyCount) - throws IOException { - return createWriterInTmp(maxKeyCount, this.family.getCompression(), false); - } - - /* - * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction * @return Writer for a new StoreFile in the tmp dir. @@ -861,15 +669,9 @@ return w; } - /* - * Change storeFiles adding into place the Reader produced by this new flush. - * @param sf - * @param set That was used to make the passed file p. - * @throws IOException - * @return Whether compaction is required. - */ - private boolean updateStorefiles(final StoreFile sf, - final SortedSet set) + @Override + public boolean updateStorefiles(final StoreFile sf, + final SortedSet set) throws IOException { this.lock.writeLock().lock(); try { @@ -1250,13 +1052,8 @@ } } - /** - * Validates a store file by opening and closing it. In HFileV2 this should - * not be an expensive operation. - * - * @param path the path to the store file - */ - private void validateStoreFile(Path path) + @Override + public void validateStoreFile(Path path) throws IOException { StoreFile storeFile = null; try { @@ -1644,6 +1441,11 @@ } return size; } + + @Override + public MemStore getMemStore() { + return this.memstore; + } @Override public long getMemStoreSize() { @@ -1726,54 +1528,11 @@ } } - public StoreFlusher getStoreFlusher(long cacheFlushId) { - return new StoreFlusherImpl(cacheFlushId); + public StoreFlusher getStoreFlusher(long cacheFlushId) + throws IOException { + return this.storeEngine.createStoreFlusher(cacheFlushId); } - private class StoreFlusherImpl implements StoreFlusher { - - private long cacheFlushId; - private SortedSet snapshot; - private StoreFile storeFile; - private Path storeFilePath; - private TimeRangeTracker snapshotTimeRangeTracker; - private AtomicLong flushedSize; - - private StoreFlusherImpl(long cacheFlushId) { - this.cacheFlushId = cacheFlushId; - this.flushedSize = new AtomicLong(); - } - - @Override - public void prepare() { - memstore.snapshot(); - this.snapshot = memstore.getSnapshot(); - this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); - } - - @Override - public void flushCache(MonitoredTask status) throws IOException { - storeFilePath = HStore.this.flushCache( - cacheFlushId, snapshot, snapshotTimeRangeTracker, flushedSize, status); - } - - @Override - public boolean commit(MonitoredTask status) throws IOException { - if (storeFilePath == null) { - return false; - } - storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId, - snapshotTimeRangeTracker, flushedSize, status); - if (HStore.this.getCoprocessorHost() != null) { - HStore.this.getCoprocessorHost().postFlush(HStore.this, storeFile); - } - - // Add new file to store files. Clear snapshot too while we have - // the Store write lock. - return HStore.this.updateStorefiles(storeFile, snapshot); - } - } - @Override public boolean needsCompaction() { return storeEngine.getCompactionPolicy().needsCompaction( @@ -1786,7 +1545,7 @@ } public static final long FIXED_OVERHEAD = - ClassSize.align((17 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + ClassSize.align((16 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (revision 1456769) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (working copy) @@ -21,6 +21,8 @@ import java.util.Collection; import java.util.List; import java.util.NavigableSet; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -35,6 +37,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; @@ -182,8 +185,37 @@ public int getCompactPriority(); - public StoreFlusher getStoreFlusher(long cacheFlushId); + public StoreFlusher getStoreFlusher(long cacheFlushId) throws IOException; + + /** + * @param path The pathname of the tmp file into which the store was flushed + * @param logCacheFlushId + * @return StoreFile created. + * @throws IOException + */ + public StoreFile commitStoreFile(final Path path, + final long logCacheFlushId, + TimeRangeTracker snapshotTimeRangeTracker, + AtomicLong flushedSize, + MonitoredTask status) throws IOException; + + /** + * Validates a store file by opening and closing it. + * + * @param path the path to the store file + */ + public void validateStoreFile(Path path) throws IOException; + /** + * Change storeFiles adding into place the Reader produced by this new flush. + * @param sf + * @param set That was used to make the passed file p. + * @throws IOException + * @return Whether compaction is required. + */ + public boolean updateStorefiles(final StoreFile sf, + final SortedSet set) throws IOException; + // Split oriented methods public boolean canSplit(); @@ -220,6 +252,11 @@ public boolean hasReferences(); /** + * @return This store's memstore + */ + public MemStore getMemStore(); + + /** * @return The size of this store's memstore, in bytes */ public long getMemStoreSize(); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (revision 1456769) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (working copy) @@ -38,7 +38,8 @@ */ @InterfaceAudience.Private public abstract class StoreEngine< - CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> { + SF extends StoreFlusher, CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> { + protected SF storeFlusher; protected CP compactionPolicy; protected C compactor; protected SFM storeFileManager; @@ -49,7 +50,7 @@ */ public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class"; - private static final Class> + private static final Class> DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class; /** @@ -72,6 +73,12 @@ public StoreFileManager getStoreFileManager() { return this.storeFileManager; } + + /** + * Creates an instance of a store flusher specific to this engine. + * @return New StoreFlusher object. + */ + public abstract StoreFlusher createStoreFlusher(long cacheFlushId) throws IOException; /** * Creates an instance of a compaction context specific to this engine. @@ -101,11 +108,11 @@ * @param kvComparator KVComparator for storeFileManager. * @return StoreEngine to use. */ - public static StoreEngine create( + public static StoreEngine create( Store store, Configuration conf, KVComparator kvComparator) throws IOException { String className = conf.get(STORE_ENGINE_CLASS_KEY, DEFAULT_STORE_ENGINE_CLASS.getName()); try { - StoreEngine se = ReflectionUtils.instantiateWithCustomCtor( + StoreEngine se = ReflectionUtils.instantiateWithCustomCtor( className, new Class[] { }, new Object[] { }); se.createComponentsOnce(conf, store, kvComparator); return se; Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (revision 1456769) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (working copy) @@ -39,22 +39,34 @@ */ @InterfaceAudience.Private public class DefaultStoreEngine extends StoreEngine< - DefaultCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> { + DefaultStoreFlusher, DefaultCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> { + public static final String DEFAULT_STORE_FLUSHER_CLASS_KEY = + "hbase.hstore.defaultengine.storeflusher.class"; public static final String DEFAULT_COMPACTOR_CLASS_KEY = "hbase.hstore.defaultengine.compactor.class"; public static final String DEFAULT_COMPACTION_POLICY_CLASS_KEY = "hbase.hstore.defaultengine.compactionpolicy.class"; + private static final Class + DEFAULT_STORE_FLUSHER_CLASS = DefaultStoreFlusher.class; private static final Class DEFAULT_COMPACTOR_CLASS = DefaultCompactor.class; private static final Class DEFAULT_COMPACTION_POLICY_CLASS = DefaultCompactionPolicy.class; + + private Configuration conf; + private Store store; + private String storeFlusherClassName; @Override protected void createComponents( Configuration conf, Store store, KVComparator kvComparator) throws IOException { + this.conf = conf; + this.store = store; storeFileManager = new DefaultStoreFileManager(kvComparator, conf); + storeFlusherClassName = conf.get(DEFAULT_STORE_FLUSHER_CLASS_KEY, + DEFAULT_STORE_FLUSHER_CLASS.getName()); String className = conf.get(DEFAULT_COMPACTOR_CLASS_KEY, DEFAULT_COMPACTOR_CLASS.getName()); try { compactor = ReflectionUtils.instantiateWithCustomCtor(className, @@ -72,6 +84,18 @@ throw new IOException("Unable to load configured compaction policy '" + className + "'", e); } } + + @Override + public StoreFlusher createStoreFlusher(long cacheFlushId) throws IOException { + try { + return ReflectionUtils.instantiateWithCustomCtor(storeFlusherClassName, + new Class[] { Long.class, Configuration.class, Store.class }, + new Object[] { cacheFlushId, conf, store }); + } catch (Exception e) { + throw new IOException("Unable to load configured store flusher '" + storeFlusherClassName + + "'", e); + } + } @Override public CompactionContext createCompaction() { Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (revision 0) @@ -0,0 +1,251 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; +import org.apache.hadoop.util.StringUtils; + +/** + * Default implementation of StoreFlusher. + */ +public class DefaultStoreFlusher implements StoreFlusher { + + private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; + private static int flush_retries_number; + private static int pauseTime; + + private static final HashMap flushLockMap = + new HashMap(); + + static final Log LOG = LogFactory.getLog(DefaultStoreFlusher.class); + + private Configuration conf; + private Store store; + private Object flushLock; + private long cacheFlushId; + private SortedSet snapshot; + private StoreFile storeFile; + private Path storeFilePath; + private TimeRangeTracker snapshotTimeRangeTracker; + private AtomicLong flushedSize; + + public DefaultStoreFlusher(Long cacheFlushId, Configuration conf, Store store) { + this.conf = conf; + this.store = store; + this.cacheFlushId = cacheFlushId; + this.flushedSize = new AtomicLong(); + + synchronized(flushLockMap) { + this.flushLock = flushLockMap.get(store); + if (this.flushLock == null) { + this.flushLock = new Object(); + flushLockMap.put(store, flushLock); + } + } + + // initialize flush_retries_number and pauseTime. + if (DefaultStoreFlusher.flush_retries_number == 0) { + DefaultStoreFlusher.flush_retries_number = conf.getInt( + "hbase.hstore.flush.retries.number", DEFAULT_FLUSH_RETRIES_NUMBER); + DefaultStoreFlusher.pauseTime = conf.getInt(HConstants.HBASE_SERVER_PAUSE, + HConstants.DEFAULT_HBASE_SERVER_PAUSE); + if (DefaultStoreFlusher.flush_retries_number <= 0) { + throw new IllegalArgumentException( + "hbase.hstore.flush.retries.number must be > 0, not " + + DefaultStoreFlusher.flush_retries_number); + } + } + } + + @Override + public void prepare() { + store.getMemStore().snapshot(); + this.snapshot = store.getMemStore().getSnapshot(); + this.snapshotTimeRangeTracker = store.getMemStore().getSnapshotTimeRangeTracker(); + } + + @Override + public void flushCache(MonitoredTask status) throws IOException { + // If an exception happens flushing, we let it out without clearing + // the memstore snapshot. The old snapshot will be returned when we say + // 'snapshot', the next time flush comes around. + // Retry after catching exception when flushing, otherwise server will abort + // itself + IOException lastException = null; + for (int i = 0; i < DefaultStoreFlusher.flush_retries_number; i++) { + try { + Path pathName = internalFlushCache(status); + try { + // Path name is null if there is no entry to flush + if (pathName != null) { + store.validateStoreFile(pathName); + } + storeFilePath = pathName; + return; + } catch (Exception e) { + LOG.warn("Failed validating store file " + pathName + + ", retring num=" + i, e); + if (e instanceof IOException) { + lastException = (IOException) e; + } else { + lastException = new IOException(e); + } + } + } catch (IOException e) { + LOG.warn("Failed flushing store file, retring num=" + i, e); + lastException = e; + } + if (lastException != null) { + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } + } + } + throw lastException; + } + + protected Path internalFlushCache(MonitoredTask status) + throws IOException { + StoreFile.Writer writer; + // Find the smallest read point across all the Scanners. + long smallestReadPoint = store.getSmallestReadPoint(); + long flushed = 0; + Path pathName; + // Don't flush if there are no entries. + if (snapshot.size() == 0) { + return null; + } + // Use a store scanner to find which rows to flush. + // Note that we need to retain deletes, hence + // treat this as a minor compaction. + InternalScanner scanner = null; + KeyValueScanner memstoreScanner = new CollectionBackedScanner(snapshot, store.getComparator()); + if (store.getCoprocessorHost() != null) { + scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner); + } + if (scanner == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getScanInfo().getMaxVersions()); + scanner = new StoreScanner(store, store.getScanInfo(), scan, + Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, + smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + } + if (store.getCoprocessorHost() != null) { + InternalScanner cpScanner = + store.getCoprocessorHost().preFlush(store, scanner); + // NULL scanner returned from coprocessor hooks means skip normal processing + if (cpScanner == null) { + return null; + } + scanner = cpScanner; + } + try { + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10); + // TODO: We can fail in the below block before we complete adding this + // flush to list of store files. Add cleanup of anything put on filesystem + // if we fail. + synchronized (flushLock) { + status.setStatus("Flushing " + store + ": creating writer"); + // A. Write the map out to the disk + writer = store.createWriterInTmp(snapshot.size(), store.getFamily().getCompression(), false); + writer.setTimeRangeTracker(snapshotTimeRangeTracker); + pathName = writer.getPath(); + try { + List kvs = new ArrayList(); + boolean hasMore; + do { + hasMore = scanner.next(kvs, compactionKVMax); + if (!kvs.isEmpty()) { + for (KeyValue kv : kvs) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + if (kv.getMemstoreTS() <= smallestReadPoint) { + // let us not change the original KV. It could be in the memstore + // changing its memstoreTS could affect other threads/scanners. + kv = kv.shallowCopy(); + kv.setMemstoreTS(0); + } + writer.append(kv); + flushed += MemStore.heapSizeChange(kv, true); + } + kvs.clear(); + } + } while (hasMore); + } finally { + // Write out the log sequence number that corresponds to this output + // hfile. Also write current time in metadata as minFlushTime. + // The hfile is current up to and including logCacheFlushId. + status.setStatus("Flushing " + store + ": appending metadata"); + writer.appendMetadata(cacheFlushId, false); + status.setStatus("Flushing " + store + ": closing flushed file"); + writer.close(); + } + } + } finally { + flushedSize.set(flushed); + scanner.close(); + } + if (LOG.isInfoEnabled()) { + LOG.info("Flushed " + + ", sequenceid=" + cacheFlushId + + ", memsize=" + StringUtils.humanReadableInt(flushed) + + ", into tmp file " + pathName); + } + return pathName; + } + + @Override + public boolean commit(MonitoredTask status) throws IOException { + if (storeFilePath == null) { + return false; + } + storeFile = store.commitStoreFile(storeFilePath, cacheFlushId, + snapshotTimeRangeTracker, flushedSize, status); + if (store.getCoprocessorHost() != null) { + store.getCoprocessorHost().postFlush(store, storeFile); + } + + // Add new file to store files. Clear snapshot too while we have + // the Store write lock. + return store.updateStorefiles(storeFile, snapshot); + } + +} Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (revision 1456769) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (working copy) @@ -924,7 +924,7 @@ * @param notpresent True if the kv was NOT present in the set. * @return Size */ - long heapSizeChange(final KeyValue kv, final boolean notpresent) { + static long heapSizeChange(final KeyValue kv, final boolean notpresent) { return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize()): 0;