diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ccfc69d704..12c0682767 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2435,6 +2435,82 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + public FlushResultImpl flushcache(Collection specificStoresToFlush, boolean writeFlushRequestWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + // fail-fast instead of waiting on the lock + if (this.closing.get()) { + String msg = "Skipping flush on " + this + " because closing"; + LOG.debug(msg); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); + } + MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this); + status.enableStatusJournal(false); + status.setStatus("Acquiring readlock on region"); + // block waiting for the lock for flushing cache + lock.readLock().lock(); + try { + if (this.closed.get()) { + String msg = "Skipping flush on " + this + " because closed"; + LOG.debug(msg); + status.abort(msg); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); + } + if (coprocessorHost != null) { + status.setStatus("Running coprocessor pre-flush hooks"); + coprocessorHost.preFlush(tracker); + } + // TODO: this should be managed within memstore with the snapshot, updated only after flush + // successful + if (numMutationsWithoutWAL.sum() > 0) { + numMutationsWithoutWAL.reset(); + dataInMemoryWithoutWAL.reset(); + } + synchronized (writestate) { + if (!writestate.flushing && writestate.writesEnabled) { + this.writestate.flushing = true; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("NOT flushing memstore for region " + this + + ", flushing=" + writestate.flushing + ", writesEnabled=" + + writestate.writesEnabled); + } + String msg = "Not flushing since " + + (writestate.flushing ? "already flushing" + : "writes not enabled"); + status.abort(msg); + return new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH, msg, false); + } + } + + try { + FlushResultImpl fs = + internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker); + + if (coprocessorHost != null) { + status.setStatus("Running post-flush coprocessor hooks"); + coprocessorHost.postFlush(tracker); + } + + if(fs.isFlushSucceeded()) { + flushesQueued.reset(); + } + + status.markComplete("Flush successful"); + return fs; + } finally { + synchronized (writestate) { + writestate.flushing = false; + this.writestate.flushRequested = false; + writestate.notifyAll(); + } + } + } finally { + lock.readLock().unlock(); + LOG.debug("Flush status journal:\n\t" + status.prettyPrintJournal()); + status.cleanup(); + } + } + /** * Should the store be flushed because it is old enough. *

@@ -2921,7 +2997,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + ", heapSize ~" + StringUtils.byteDesc(mss.getHeapSize()) + "/" + mss.getHeapSize() + ", currentSize=" + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + " for " + this.getRegionInfo().getEncodedName() + " in " + time + "ms, sequenceid=" - + flushOpSeqId + ", compaction requested=" + compactionRequested + + flushOpSeqId + ", maxFlushedSeqId=" + flushedSeqId + ", compaction requested=" + compactionRequested + ((wal == null) ? "; wal=null" : ""); LOG.info(msg); status.setStatus(msg); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMaxFlushedSeqId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMaxFlushedSeqId.java new file mode 100644 index 0000000000..0cd4e3e6b1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMaxFlushedSeqId.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALKey; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Provides FSHLog test cases. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestMaxFlushedSeqId { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMaxFlushedSeqId.class); + + @Rule + public TestName name = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(TestMaxFlushedSeqId.class); + + protected static Configuration CONF; + protected static FileSystem FS; + protected static Path DIR; + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Rule + public final TestName currentTest = new TestName(); + private TableDescriptor htd; + + @Before + public void setUp() throws Exception { + FileStatus[] entries = FS.listStatus(new Path("/")); + for (FileStatus dir : entries) { + FS.delete(dir.getPath(), true); + } + final Path hbaseDir = TEST_UTIL.createRootDir(); + final Path hbaseWALDir = TEST_UTIL.createWALRootDir(); + DIR = new Path(hbaseWALDir, currentTest.getMethodName()); + assertNotEquals(hbaseDir, hbaseWALDir); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Make block sizes small. + TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); + // quicker heartbeat interval for faster DN death notification + TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); + TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); + TEST_UTIL.getConfiguration().setInt("dfs.client.socket-timeout", 5000); + + // faster failover with cluster.shutdown();fs.close() idiom + TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connect.max.retries", 1); + TEST_UTIL.getConfiguration().setInt("dfs.client.block.recovery.retries", 1); + TEST_UTIL.getConfiguration().setInt("hbase.ipc.client.connection.maxidletime", 500); + TEST_UTIL.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY, + SampleRegionWALCoprocessor.class.getName()); + TEST_UTIL.startMiniDFSCluster(3); + + CONF = TEST_UTIL.getConfiguration(); + FS = TEST_UTIL.getDFSCluster().getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testMaxFlushedSeqId() throws IOException, InterruptedException { + final String name = this.name.getMethodName(); + final byte[] a = Bytes.toBytes("a"); + final byte[] b = Bytes.toBytes("b"); + + final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + final CountDownLatch holdAppend = new CountDownLatch(1); + final CountDownLatch flushFinished = new CountDownLatch(1); + final CountDownLatch putFinished = new CountDownLatch(1); + + try (FSHLog log = + new FSHLog(FS, FSUtils.getRootDir(CONF), name, HConstants.HREGION_OLDLOGDIR_NAME, CONF, + null, true, null, null)) { + log.init(); + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) + throws IOException { + if (startHoldingForAppend.get()) { + try { + holdAppend.await(); + } catch (InterruptedException e) { + LOG.error(e.toString(), e); + } + } + } + }); + + // open a new region which uses this WAL + TableDescriptor htd = + TableDescriptorBuilder.newBuilder(TableName.valueOf(this.name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)) + .build(); + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + final HRegion region = TEST_UTIL.createLocalHRegion(hri, htd, log); + ExecutorService exec = Executors.newFixedThreadPool(2); + + // do a regular write first because of memstore size calculation. + Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b); + region.put(put); + + startHoldingForAppend.set(true); + exec.submit(new Runnable() { + @Override + public void run() { + try { + region.put(new Put(b).addColumn(a, a, a).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL)); + putFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the put a chance to start + Threads.sleep(3000); + + exec.submit(new Runnable() { + @Override + public void run() { + try { + HRegion.FlushResult flushResult = region.flush(true); + LOG.info("Flush result:" + flushResult.getResult()); + LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); + flushFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + } + }); + + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + putFinished.await(); + flushFinished.await(); + + // check whether flush went through + assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][]{b}).size()); + + // now check the region's unflushed seqIds. + long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); +// assertEquals("Found seqId for the region which is already flushed", +// HConstants.NO_SEQNUM, seqId); + + long maxFlushedSeqId1 = region.getMaxFlushedSeqId(); + + region.put(put); + // Only flush one store. + List specificStoresToFlush = new ArrayList(1); + specificStoresToFlush.add(region.getStore(a)); + region.flushcache(specificStoresToFlush,false, FlushLifeCycleTracker.DUMMY); + + long maxFlushedSeqId2 = region.getMaxFlushedSeqId(); + LOG.info("maxFlushedSeqId1=" + maxFlushedSeqId1 + ", maxFlushedSeqId2=" + maxFlushedSeqId2); + + region.close(); + } + } +}