From 1b98c11d6de0a4f07c7bd6093e37dc837bd8f511 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 7 Dec 2014 23:35:53 +0800 Subject: [PATCH] HBASE-10201 Port 'Make flush decisions per column family' to trunk --- .../org/apache/hadoop/hbase/HTableDescriptor.java | 24 + hbase-common/src/main/resources/hbase-default.xml | 24 +- .../org/apache/hadoop/hbase/master/HMaster.java | 8 + .../hbase/regionserver/FlushAllStoresPolicy.java | 35 ++ .../hbase/regionserver/FlushLargeStoresPolicy.java | 101 ++++ .../hadoop/hbase/regionserver/FlushPolicy.java | 44 ++ .../hbase/regionserver/FlushPolicyFactory.java | 68 +++ .../hadoop/hbase/regionserver/FlushRequester.java | 15 +- .../apache/hadoop/hbase/regionserver/HRegion.java | 199 ++++--- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hadoop/hbase/regionserver/LogRoller.java | 3 +- .../hadoop/hbase/regionserver/MemStoreFlusher.java | 76 +-- .../hadoop/hbase/regionserver/RSRpcServices.java | 23 +- .../hadoop/hbase/regionserver/wal/FSHLog.java | 242 +++++--- .../hadoop/hbase/regionserver/wal/FSWALEntry.java | 27 +- .../hadoop/hbase/wal/DisabledWALProvider.java | 8 +- .../main/java/org/apache/hadoop/hbase/wal/WAL.java | 11 +- .../org/apache/hadoop/hbase/TestIOFencing.java | 4 +- .../hbase/regionserver/TestFlushRegionEntry.java | 4 +- .../hbase/regionserver/TestHeapMemoryManager.java | 16 +- .../regionserver/TestPerColumnFamilyFlush.java | 638 +++++++++++++++++++++ .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 42 +- .../hbase/regionserver/wal/TestWALReplay.java | 19 +- .../hadoop/hbase/wal/TestDefaultWALProvider.java | 74 +-- .../apache/hadoop/hbase/wal/TestWALFactory.java | 38 +- 25 files changed, 1462 insertions(+), 283 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 0ae0538..521fc5f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -130,6 +130,8 @@ public class HTableDescriptor implements Comparable { private static final Bytes MEMSTORE_FLUSHSIZE_KEY = new Bytes(Bytes.toBytes(MEMSTORE_FLUSHSIZE)); + public static final String FLUSH_POLICY = "FLUSH_POLICY"; + /** * INTERNAL Used by rest interface to access this metadata * attribute which denotes if the table is a -ROOT- region or not @@ -766,6 +768,28 @@ public class HTableDescriptor implements Comparable { } /** + * This sets the class associated with the flush policy which determines determines the stores + * need to be flushed when flushing a region. The class used by default is defined in + * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy} + * @param clazz the class name + */ + public HTableDescriptor setFlushPolicyClassName(String clazz) { + setValue(FLUSH_POLICY, clazz); + return this; + } + + /** + * This gets the class associated with the flush policy which determines determines the stores + * need to be flushed when flushing a region. The class used by default is defined in + * {@link org.apache.hadoop.hbase.regionserver.FlushPolicy} + * @return the class name of the flush policy for this table. If this returns null, the default + * flush policy is used. + */ + public String getFlushPolicyClassName() { + return getValue(FLUSH_POLICY); + } + + /** * Adds a column family. * For the updating purpose please use {@link #modifyFamily(HColumnDescriptor)} instead. * @param family HColumnDescriptor of family to add. diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index b9e96db..1c526e0 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -187,7 +187,7 @@ possible configurations would overwhelm and obscure the important. A value of 0 means a single queue shared between all the handlers. A value of 1 means that each handler has its own queue. - + hbase.ipc.server.callqueue.read.ratio 0 Split the call queues into read and write queues. @@ -330,7 +330,14 @@ possible configurations would overwhelm and obscure the important. A split policy determines when a region should be split. The various other split policies that are available currently are ConstantSizeRegionSplitPolicy, DisabledRegionSplitPolicy, - DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc. + DelimitedKeyPrefixRegionSplitPolicy, KeyPrefixRegionSplitPolicy etc. + + + + hbase.regionserver.flush.policy + org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy + + A flush policy determines the stores that need to be flushed when flushing a region. @@ -588,6 +595,19 @@ possible configurations would overwhelm and obscure the important. every hbase.server.thread.wakefrequency. + hbase.hregion.percolumnfamilyflush.size.lower.bound + 16777216 + + If FlushLargeStoresPolicy is used, then every time that we hit the + total memstore limit, we find out all the column families whose memstores + exceed this value, and only flush them, while retaining the others whose + memstores are lower than this limit. If none of the families have their + memstore size more than this, all the memstores will be flushed + (just as usual). This value should be less than half of the total memstore + threshold (hbase.hregion.memstore.flush.size). + + + hbase.hregion.preclose.flush.size 5242880 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 3e96c54..6e98226 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; +import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; @@ -1231,6 +1232,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server { throw new DoNotRetryIOException(ex); } + // check flush policy class can be loaded + try { + FlushPolicyFactory.getFlushPolicyClass(htd, conf); + } catch (Exception ex) { + throw new DoNotRetryIOException(ex); + } + // check compression can be loaded try { checkCompression(htd); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java new file mode 100644 index 0000000..1a149bb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A {@link FlushPolicy} that always flushes all stores for a given region. + */ +@InterfaceAudience.Private +public class FlushAllStoresPolicy extends FlushPolicy { + + @Override + public Collection selectStoresToFlush(Collection stores) { + return stores; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java new file mode 100644 index 0000000..8c50e3f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A {@link FlushPolicy} that only flushes store larger a given threshold. And if no store is large + * enough, then all stores will be flushed. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class FlushLargeStoresPolicy extends FlushPolicy { + + private static final Log LOG = LogFactory.getLog(FlushLargeStoresPolicy.class); + + public static final String HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND = + "hbase.hregion.percolumnfamilyflush.size.lower.bound"; + + public static final String FLUSH_SIZE_LOWER_BOUND_KEY = + "FlushLargeStoresPolicy.flush_size_lower_bound"; + + private static final long DEFAULT_FLUSH_SIZE_LOWER_BOUND = 1024 * 1024 * 16L; + + private HRegion region; + + private long flushSizeLowerBound; + + @Override + protected void configureForRegion(HRegion region) { + this.region = region; + long flushSizeLowerBound; + String flushedSizeLowerBoundString = region.getTableDesc().getValue(FLUSH_SIZE_LOWER_BOUND_KEY); + if (flushedSizeLowerBoundString == null) { + flushSizeLowerBound = + getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, + DEFAULT_FLUSH_SIZE_LOWER_BOUND); + LOG.warn(FLUSH_SIZE_LOWER_BOUND_KEY + " is not specified, use global config(" + + flushSizeLowerBound + ") instead"); + } else { + try { + flushSizeLowerBound = Long.parseLong(flushedSizeLowerBoundString); + } catch (NumberFormatException nfe) { + flushSizeLowerBound = + getConf().getLong(HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, + DEFAULT_FLUSH_SIZE_LOWER_BOUND); + LOG.warn("Number format exception when parsing " + FLUSH_SIZE_LOWER_BOUND_KEY + + " for table " + region.getTableDesc().getTableName() + ":" + + flushedSizeLowerBoundString + ". " + nfe + ", use global config(" + + flushSizeLowerBound + ") instead"); + + } + } + this.flushSizeLowerBound = flushSizeLowerBound; + } + + @Override + public Collection selectStoresToFlush(Collection stores) { + Set specificStoresToFlush = new HashSet(); + for (Store store : stores) { + if (store.getMemStoreSize() > this.flushSizeLowerBound) { + specificStoresToFlush.add(store); + if (LOG.isDebugEnabled()) { + LOG.debug("Column Family: " + store.getColumnFamilyName() + " of region " + region + + " will be flushed"); + } + } + } + // Didn't find any CFs which were above the threshold for selection. + if (specificStoresToFlush.size() == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Since none of the CFs were above the size, flushing all."); + } + return stores; + } else { + return specificStoresToFlush; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java new file mode 100644 index 0000000..9f976ff --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Collection; + +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A flush policy determines the stores that need to be flushed when flushing a region. + */ +@InterfaceAudience.Private +public abstract class FlushPolicy extends Configured { + + /** + * Upon construction, this method will be called with the region to be governed. It will be called + * once and only once. + */ + protected void configureForRegion(HRegion region) { + } + + /** + * @param stores all stores for a give region. + * @return the stores need to be flushed. + */ + public abstract Collection selectStoresToFlush(Collection stores); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java new file mode 100644 index 0000000..fa5b51d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicyFactory.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.util.ReflectionUtils; + +/** + * The class that creates a flush policy from a conf and HTableDescriptor. + * @see {@link FlushLargeStoresPolicy} Default flush policy now + * @see {@link FlushAllStoresPolicy} Default flush policy for 0.98 + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class FlushPolicyFactory { + + public static final String HBASE_FLUSH_POLICY_KEY = "hbase.regionserver.flush.policy"; + + private static final Class DEFAULT_FLUSH_POLICY_CLASS = + FlushLargeStoresPolicy.class; + + /** + * Create the FlushPolicy configured for the given table. + */ + public static FlushPolicy create(HRegion region, Configuration conf) throws IOException { + Class clazz = getFlushPolicyClass(region.getTableDesc(), conf); + FlushPolicy policy = ReflectionUtils.newInstance(clazz, conf); + policy.configureForRegion(region); + return policy; + } + + /** + * Get FlushPolicy class for the given table. + */ + public static Class getFlushPolicyClass(HTableDescriptor htd, + Configuration conf) throws IOException { + String className = htd.getFlushPolicyClassName(); + if (className == null) { + className = conf.get(HBASE_FLUSH_POLICY_KEY, DEFAULT_FLUSH_POLICY_CLASS.getName()); + } + try { + Class clazz = Class.forName(className).asSubclass(FlushPolicy.class); + return clazz; + } catch (Exception e) { + throw new IOException("Unable to load configured flush policy '" + className + + "' for table '" + htd.getTableName() + "'", e); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index e1c3144..7517454 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -30,26 +30,31 @@ public interface FlushRequester { * Tell the listener the cache needs to be flushed. * * @param region the HRegion requesting the cache flush + * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log + * rolling. */ - void requestFlush(HRegion region); + void requestFlush(HRegion region, boolean forceFlushAllStores); + /** * Tell the listener the cache needs to be flushed after a delay * * @param region the HRegion requesting the cache flush * @param delay after how much time should the flush happen + * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log + * rolling. */ - void requestDelayedFlush(HRegion region, long delay); + void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores); /** * Register a FlushRequestListener - * + * * @param listener */ void registerFlushRequestListener(final FlushRequestListener listener); /** * Unregister the given FlushRequestListener - * + * * @param listener * @return true when passed listener is unregistered successfully. */ @@ -57,7 +62,7 @@ public interface FlushRequester { /** * Sets the global memstore limit to a new size. - * + * * @param globalMemStoreSize */ public void setGlobalMemstoreLimit(long globalMemStoreSize); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index fd5225c..0998668 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,6 +42,7 @@ import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -62,7 +64,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -89,6 +90,7 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.backup.HFileArchiver; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -132,14 +134,9 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.Write import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; -import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; -import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils; import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.util.Bytes; @@ -155,6 +152,11 @@ import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; @@ -516,7 +518,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long memstoreFlushSize; final long timestampSlop; final long rowProcessorTimeout; - private volatile long lastFlushTime; + + // Last flush time for each Store. Useful when we are flushing for each column + private final ConcurrentMap lastStoreFlushTimeMap = + new ConcurrentHashMap(); + final RegionServerServices rsServices; private RegionServerAccounting rsAccounting; private long flushCheckInterval; @@ -540,6 +546,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // private HTableDescriptor htableDescriptor = null; private RegionSplitPolicy splitPolicy; + private FlushPolicy flushPolicy; private final MetricsRegion metricsRegion; private final MetricsRegionWrapperImpl metricsRegionWrapper; @@ -616,7 +623,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // throw new IllegalArgumentException(MEMSTORE_FLUSH_PER_CHANGES + " can not exceed " + MAX_FLUSH_PER_CHANGES); } - this.rowLockWaitDuration = conf.getInt("hbase.rowlock.wait.duration", DEFAULT_ROWLOCK_WAIT_DURATION); @@ -775,8 +781,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Initialize split policy this.splitPolicy = RegionSplitPolicy.create(this, conf); - this.lastFlushTime = EnvironmentEdgeManager.currentTime(); - // Use maximum of wal sequenceid or that which was found in stores + // Initialize flush policy + this.flushPolicy = FlushPolicyFactory.create(this, conf); + + long lastFlushTime = EnvironmentEdgeManager.currentTime(); + for (Store store: stores.values()) { + this.lastStoreFlushTimeMap.put(store, lastFlushTime); + } + + // Use maximum of log sequenceid or that which was found in stores // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; if (this.isRecovering) { @@ -1303,10 +1316,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // status.setStatus("Running coprocessor post-close hooks"); this.coprocessorHost.postClose(abort); } - if ( this.metricsRegion != null) { + if (this.metricsRegion != null) { this.metricsRegion.close(); } - if ( this.metricsRegionWrapper != null) { + if (this.metricsRegionWrapper != null) { Closeables.closeQuietly(this.metricsRegionWrapper); } status.markComplete("Closed"); @@ -1445,9 +1458,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.fs; } - /** @return the last time the region was flushed */ - public long getLastFlushTime() { - return this.lastFlushTime; + /** + * @return Returns the earliest time a store in the region was flushed. All + * other stores in the region would have been flushed either at, or + * after this time. + */ + @VisibleForTesting + public long getEarliestFlushTimeForAllStores() { + return Collections.min(lastStoreFlushTimeMap.values()); + } + + /** + * @return Returns the latest time a store in the region was flushed, which is + * the last flush time of this region. + */ + long getLatestFlushTimeForAllStores() { + return Collections.max(lastStoreFlushTimeMap.values()); } ////////////////////////////////////////////////////////////////////////////// @@ -1611,6 +1637,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } /** + * Flush all stores. + *

+ * See {@link #flushcache(boolean)}. + * + * @return whether the flush is success and whether the region needs compacting + * @throws IOException + */ + public FlushResult flushcache() throws IOException { + return flushcache(true); + } + + /** * Flush the cache. * * When this method is called the cache will be flushed unless: @@ -1623,14 +1661,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * *

This method may block for some time, so it should not be called from a * time-sensitive thread. - * - * @return true if the region needs compacting + * @param forceFlushAllStores whether we want to flush all stores + * @return whether the flush is success and whether the region needs compacting * * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of wal is required * because a Snapshot was not properly persisted. */ - public FlushResult flushcache() throws IOException { + public FlushResult flushcache(boolean forceFlushAllStores) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; @@ -1672,8 +1710,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } } + try { - FlushResult fs = internalFlushcache(status); + Collection specificStoresToFlush = + forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(this.stores + .values()); + FlushResult fs = internalFlushcache(this.wal, -1, specificStoresToFlush, status); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); @@ -1709,7 +1751,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } long now = EnvironmentEdgeManager.currentTime(); //if we flushed in the recent past, we don't need to do again now - if ((now - getLastFlushTime() < flushCheckInterval)) { + if ((now - getLatestFlushTimeForAllStores() < flushCheckInterval)) { return false; } //since we didn't flush in the recent past, flush now if certain conditions @@ -1741,18 +1783,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // */ protected FlushResult internalFlushcache(MonitoredTask status) throws IOException { - return internalFlushcache(this.wal, -1, status); + return internalFlushcache(this.wal, -1, stores.values(), status); } /** * @param wal Null if we're NOT to go via wal. * @param myseqid The seqid to use if wal is null writing out flush file. + * @param storesToFlush The list of stores to flush. * @return object describing the flush's state * @throws IOException * @see #internalFlushcache(MonitoredTask) */ - protected FlushResult internalFlushcache( - final WAL wal, final long myseqid, MonitoredTask status) throws IOException { + protected FlushResult internalFlushcache(final WAL wal, final long myseqid, + final Collection storesToFlush, MonitoredTask status) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is aborted..."); @@ -1794,53 +1837,66 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } - LOG.info("Started memstore flush for " + this + - ", current region memstore size " + - StringUtils.byteDesc(this.memstoreSize.get()) + - ((wal != null)? "": "; wal is null, using passed sequenceid=" + myseqid)); - + if (LOG.isInfoEnabled()) { + LOG.info("Started memstore flush for " + this + ", current region memstore size " + + StringUtils.byteDesc(this.memstoreSize.get()) + ", and " + storesToFlush.size() + "/" + + stores.size() + " column families' memstores are being flushed." + + ((wal != null) ? "" : "; wal is null, using passed sequenceid=" + myseqid)); + for (Store store : storesToFlush) { + LOG.info("Flushing Column Family: " + store.getColumnFamilyName() + " which was occupying " + + StringUtils.byteDesc(store.getMemStoreSize()) + " of memstore."); + } + } // Stop updates while we snapshot the memstore of all of these regions' stores. We only have // to do this for a moment. It is quick. We also set the memstore size to zero here before we // allow updates again so its value will represent the size of the updates received // during flush MultiVersionConsistencyControl.WriteEntry w = null; - // We have to take an update lock during snapshot, or else a write could end up in both snapshot // and memstore (makes it difficult to do atomic rows then) status.setStatus("Obtaining lock to block concurrent updates"); // block waiting for the lock for internal flush this.updatesLock.writeLock().lock(); - long totalFlushableSize = 0; status.setStatus("Preparing to flush by snapshotting stores in " + getRegionInfo().getEncodedName()); + long totalFlushableSizeOfFlushableStores = 0; + + Set flushedFamilyNames = new HashSet(); + for (Store store: storesToFlush) { + flushedFamilyNames.add(store.getFamily().getName()); + } + List storeFlushCtxs = new ArrayList(stores.size()); TreeMap> committedFiles = new TreeMap>( Bytes.BYTES_COMPARATOR); - long flushSeqId = -1L; + long flushSeqId = HConstants.NO_SEQNUM; + byte[] encodedRegionName = getRegionInfo().getEncodedNameAsBytes(); long trxId = 0; try { try { w = mvcc.beginMemstoreInsert(); if (wal != null) { - if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { + if (!wal.startCacheFlush(encodedRegionName, flushedFamilyNames)) { // This should never happen. String msg = "Flush will not be started for [" + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing."; status.setStatus(msg); return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg); } - // Get a sequence id that we can use to denote the flush. It will be one beyond the last - // edit that made it into the hfile (the below does not add an edit, it just asks the - // WAL system to return next sequence edit). - flushSeqId = getNextSequenceId(wal); + long oldestUnflushedSeqId = wal + .getEarliestMemstoreSeqNum(encodedRegionName); + // no oldestUnflushedSeqId means we flushed all stores. + // or the unflushed stores are all empty. + flushSeqId = oldestUnflushedSeqId == HConstants.NO_SEQNUM ? getNextSequenceId(wal) + : oldestUnflushedSeqId - 1; } else { // use the provided sequence Id as WAL is not being used for this flush. flushSeqId = myseqid; } - for (Store s : stores.values()) { - totalFlushableSize += s.getFlushableSize(); + for (Store s : storesToFlush) { + totalFlushableSizeOfFlushableStores += s.getFlushableSize(); storeFlushCtxs.add(s.createFlushContext(flushSeqId)); committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL } @@ -1849,8 +1905,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (wal != null) { FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, getRegionInfo(), flushSeqId, committedFiles); + // no sync. Sync is below where we do not hold the updates lock trxId = WALUtil.writeFlushMarker(wal, this.htableDescriptor, getRegionInfo(), - desc, sequenceId, false); // no sync. Sync is below where we do not hold the updates lock + desc, sequenceId, false); } // Prepare flush (take a snapshot) @@ -1879,7 +1936,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // this.updatesLock.writeLock().unlock(); } String s = "Finished memstore snapshotting " + this + - ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSize; + ", syncing WAL and waiting on mvcc, flushsize=" + totalFlushableSizeOfFlushableStores; status.setStatus(s); if (LOG.isTraceEnabled()) LOG.trace(s); // sync unflushed WAL changes @@ -1929,8 +1986,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Switch snapshot (in memstore) -> new hfile (thus causing // all the store scanners to reset/reseek). - Iterator it = stores.values().iterator(); // stores.values() and storeFlushCtxs have - // same order + Iterator it = storesToFlush.iterator(); + // stores.values() and storeFlushCtxs have same order for (StoreFlushContext flush : storeFlushCtxs) { boolean needsCompaction = flush.commit(status); if (needsCompaction) { @@ -1941,7 +1998,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // storeFlushCtxs.clear(); // Set down the memstore size by amount of flush. - this.addAndGetGlobalMemstoreSize(-totalFlushableSize); + this.addAndGetGlobalMemstoreSize(-totalFlushableSizeOfFlushableStores); if (wal != null) { // write flush marker to WAL. If fail, we should throw DroppedSnapshotException @@ -1983,9 +2040,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } // Record latest flush time - this.lastFlushTime = EnvironmentEdgeManager.currentTime(); + for (Store store: storesToFlush) { + this.lastStoreFlushTimeMap.put(store, startTime); + } - // Update the last flushed sequence id for region. TODO: This is dup'd inside the WAL/FSHlog. + // Update the oldest unflushed sequence id for region. this.lastFlushSeqId = flushSeqId; // C. Finally notify anyone waiting on memstore to clear: @@ -1996,13 +2055,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // long time = EnvironmentEdgeManager.currentTime() - startTime; long memstoresize = this.memstoreSize.get(); - String msg = "Finished memstore flush of ~" + - StringUtils.byteDesc(totalFlushableSize) + "/" + totalFlushableSize + - ", currentsize=" + - StringUtils.byteDesc(memstoresize) + "/" + memstoresize + - " for region " + this + " in " + time + "ms, sequenceid=" + flushSeqId + - ", compaction requested=" + compactionRequested + - ((wal == null)? "; wal=null": ""); + String msg = "Finished memstore flush of ~" + + StringUtils.byteDesc(totalFlushableSizeOfFlushableStores) + "/" + + totalFlushableSizeOfFlushableStores + ", currentsize=" + + StringUtils.byteDesc(memstoresize) + "/" + memstoresize + + " for region " + this + " in " + time + "ms, sequenceid=" + + flushSeqId + ", compaction requested=" + compactionRequested + + ((wal == null) ? "; wal=null" : ""); LOG.info(msg); status.setStatus(msg); @@ -2138,7 +2197,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if(delete.getFamilyCellMap().isEmpty()){ for(byte [] family : this.htableDescriptor.getFamiliesKeys()){ // Don't eat the timestamp - delete.deleteFamily(family, delete.getTimeStamp()); + delete.addFamily(family, delete.getTimeStamp()); } } else { for(byte [] family : delete.getFamilyCellMap().keySet()) { @@ -2789,6 +2848,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // coprocessorHost.postBatchMutate(miniBatchOp); } + // ------------------------------------------------------------------ // STEP 8. Advance mvcc. This will make this put visible to scanners and getters. // ------------------------------------------------------------------ @@ -2820,7 +2880,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // success = true; return addedSize; } finally { - // if the wal sync was unsuccessful, remove keys from memstore if (doRollBackMemstore) { rollbackMemstore(memstoreCells); @@ -3179,8 +3238,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * We throw RegionTooBusyException if above memstore limit * and expect client to retry using some kind of backoff */ - private void checkResources() - throws RegionTooBusyException { + private void checkResources() throws RegionTooBusyException { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; @@ -3376,7 +3434,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // writestate.flushRequested = true; } // Make request outside of synchronize block; HBASE-818. - this.rsServices.getFlushRequester().requestFlush(this); + this.rsServices.getFlushRequester().requestFlush(this, false); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " + this); } @@ -3497,7 +3555,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, status); + internalFlushcache(null, seqid, stores.values(), status); } // Now delete the content of recovered edits. We're done w/ them. for (Path file: files) { @@ -3644,7 +3702,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // editsCount++; } if (flush) { - internalFlushcache(null, currentEditSeqId, status); + internalFlushcache(null, currentEditSeqId, stores.values(), status); } if (coprocessorHost != null) { @@ -3992,7 +4050,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { - FlushResult fs = this.flushcache(); + FlushResult fs = this.flushcache(true); if (fs.isFlushSucceeded()) { seqId = fs.flushSequenceId; } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { @@ -5025,8 +5083,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // FileSystem fs = a.getRegionFileSystem().getFileSystem(); // Make sure each region's cache is empty - a.flushcache(); - b.flushcache(); + a.flushcache(true); + b.flushcache(true); // Compact each region so we only have one store file per family a.compactStores(true); @@ -5140,7 +5198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // do after lock if (this.metricsRegion != null) { - long totalSize = 0l; + long totalSize = 0L; for (Cell cell : results) { totalSize += CellUtil.estimatedSerializedSizeOf(cell); } @@ -5308,7 +5366,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // to get a sequence id assigned which is done by FSWALEntry#stampRegionSequenceId walKey = this.appendEmptyEdit(this.wal, memstoreCells); } - // 9. Release region lock if (locked) { this.updatesLock.readLock().unlock(); @@ -5436,7 +5493,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // WALEdit walEdits = null; List allKVs = new ArrayList(append.size()); Map> tempMemstore = new HashMap>(); - long size = 0; long txid = 0; @@ -5638,7 +5694,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Append a faked WALEdit in order for SKIP_WAL updates to get mvcc assigned walKey = this.appendEmptyEdit(this.wal, memstoreCells); } - size = this.addAndGetGlobalMemstoreSize(size); flush = isFlushSize(size); } finally { @@ -5935,8 +5990,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 42 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - (12 * Bytes.SIZEOF_LONG) + + 44 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + (11 * Bytes.SIZEOF_LONG) + 4 * Bytes.SIZEOF_BOOLEAN); // woefully out of date - currently missing: @@ -6507,6 +6562,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // return this.maxSeqIdInStores; } + @VisibleForTesting + public long getOldestSeqIdOfStore(byte[] familyName) { + return wal.getEarliestMemstoreSeqNum(getRegionInfo() + .getEncodedNameAsBytes(), familyName); + } + /** * @return if a given region is in compaction now. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 41ea09d..2fca49f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1484,7 +1484,7 @@ public class HRegionServer extends HasThread implements //Throttle the flushes by putting a delay. If we don't throttle, and there //is a balanced write-load on the regions in a table, we might end up //overwhelming the filesystem with too many flushes at once. - requester.requestDelayedFlush(r, randomDelay); + requester.requestDelayedFlush(r, randomDelay, true); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 6f5dfa4..aa60bfb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -166,7 +166,8 @@ class LogRoller extends HasThread { if (r != null) { requester = this.services.getFlushRequester(); if (requester != null) { - requester.requestFlush(r); + // force flushing all stores to clean old logs + requester.requestFlush(r, true); scheduled = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 1d59701..cf8ad03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -39,10 +39,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; @@ -50,7 +50,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.htrace.Trace; import org.htrace.TraceScope; @@ -114,11 +114,11 @@ class MemStoreFlusher implements FlushRequester { 90000); int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2); this.flushHandlers = new FlushHandler[handlerCount]; - LOG.info("globalMemStoreLimit=" + - StringUtils.humanReadableInt(this.globalMemStoreLimit) + - ", globalMemStoreLimitLowMark=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) + - ", maxHeap=" + StringUtils.humanReadableInt(max)); + LOG.info("globalMemStoreLimit=" + + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimit, "", 1) + + ", globalMemStoreLimitLowMark=" + + TraditionalBinaryPrefix.long2String(this.globalMemStoreLimitLowMark, "", 1) + + ", maxHeap=" + TraditionalBinaryPrefix.long2String(max, "", 1)); } public Counter getUpdatesBlockedMsHighWater() { @@ -160,13 +160,12 @@ class MemStoreFlusher implements FlushRequester { // lots of little flushes and cause lots of compactions, etc, which just makes // life worse! if (LOG.isDebugEnabled()) { - LOG.debug("Under global heap pressure: " + - "Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + - "store files, but is " + - StringUtils.humanReadableInt(bestAnyRegion.memstoreSize.get()) + - " vs best flushable region's " + - StringUtils.humanReadableInt(bestFlushableRegion.memstoreSize.get()) + - ". Choosing the bigger."); + LOG.debug("Under global heap pressure: " + "Region " + + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + + TraditionalBinaryPrefix.long2String(bestAnyRegion.memstoreSize.get(), "", 1) + + " vs best flushable region's " + + TraditionalBinaryPrefix.long2String(bestFlushableRegion.memstoreSize.get(), "", 1) + + ". Choosing the bigger."); } regionToFlush = bestAnyRegion; } else { @@ -180,7 +179,7 @@ class MemStoreFlusher implements FlushRequester { Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); LOG.info("Flush of region " + regionToFlush + " due to global heap pressure"); - flushedOne = flushRegion(regionToFlush, true); + flushedOne = flushRegion(regionToFlush, true, true); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."); @@ -206,7 +205,7 @@ class MemStoreFlusher implements FlushRequester { if (fqe == null || fqe instanceof WakeupFlushThread) { if (isAboveLowWaterMark()) { LOG.debug("Flush thread woke up because memory above low water=" - + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1)); if (!flushOneForGlobalPressure()) { // Wasn't able to flush any region, but we're above low water mark // This is unlikely to happen, but might happen when closing the @@ -293,23 +292,23 @@ class MemStoreFlusher implements FlushRequester { getGlobalMemstoreSize() >= globalMemStoreLimitLowMark; } - public void requestFlush(HRegion r) { + public void requestFlush(HRegion r, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } } } - public void requestDelayedFlush(HRegion r, long delay) { + public void requestDelayedFlush(HRegion r, long delay, boolean forceFlushAllStores) { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay - FlushRegionEntry fqe = new FlushRegionEntry(r); + FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); @@ -362,7 +361,7 @@ class MemStoreFlusher implements FlushRequester { } } - /* + /** * A flushRegion that checks store file count. If too many, puts the flush * on delay queue to retry later. * @param fqe @@ -406,22 +405,24 @@ class MemStoreFlusher implements FlushRequester { return true; } } - return flushRegion(region, false); + return flushRegion(region, false, fqe.isForceFlushAllStores()); } - /* + /** * Flush a region. * @param region Region to flush. * @param emergencyFlush Set if we are being force flushed. If true the region * needs to be removed from the flush queue. If false, when we were called * from the main flusher run loop and we got the entry to flush by calling * poll on the flush queue (which removed it). - * + * @param selectiveFlushRequest Do we want to selectively flush only the + * column families that dominate the memstore size? * @return true if the region was successfully flushed, false otherwise. If * false, there will be accompanying log messages explaining why the log was * not flushed. */ - private boolean flushRegion(final HRegion region, final boolean emergencyFlush) { + private boolean flushRegion(final HRegion region, final boolean emergencyFlush, + boolean forceFlushAllStores) { long startTime = 0; synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); @@ -444,7 +445,7 @@ class MemStoreFlusher implements FlushRequester { lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - HRegion.FlushResult flushResult = region.flushcache(); + HRegion.FlushResult flushResult = region.flushcache(forceFlushAllStores); boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; @@ -528,11 +529,12 @@ class MemStoreFlusher implements FlushRequester { while (isAboveHighWaterMark() && !server.isStopped()) { if (!blocked) { startTime = EnvironmentEdgeManager.currentTime(); - LOG.info("Blocking updates on " + server.toString() + - ": the global memstore size " + - StringUtils.humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + - " is >= than blocking " + - StringUtils.humanReadableInt(globalMemStoreLimit) + " size"); + LOG.info("Blocking updates on " + + server.toString() + + ": the global memstore size " + + TraditionalBinaryPrefix.long2String(server.getRegionServerAccounting() + .getGlobalMemstoreSize(), "", 1) + " is >= than blocking " + + TraditionalBinaryPrefix.long2String(globalMemStoreLimit, "", 1) + " size"); } blocked = true; wakeupFlushThread(); @@ -656,10 +658,13 @@ class MemStoreFlusher implements FlushRequester { private long whenToExpire; private int requeueCount = 0; - FlushRegionEntry(final HRegion r) { + private boolean forceFlushAllStores; + + FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) { this.region = r; this.createTime = EnvironmentEdgeManager.currentTime(); this.whenToExpire = this.createTime; + this.forceFlushAllStores = forceFlushAllStores; } /** @@ -679,6 +684,13 @@ class MemStoreFlusher implements FlushRequester { } /** + * @return whether we need to flush all stores. + */ + public boolean isForceFlushAllStores() { + return forceFlushAllStores; + } + + /** * @param when When to expire, when to come up out of the queue. * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() * to whatever you pass. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 8821803..942fc0d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -70,7 +70,6 @@ import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; -import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; @@ -149,8 +148,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion.Operation; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; @@ -158,6 +155,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.net.DNS; import org.apache.zookeeper.KeeperException; @@ -692,7 +691,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, */ private OperationStatus [] doReplayBatchOp(final HRegion region, final List mutations, long replaySeqId) throws IOException { - long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { @@ -715,6 +713,19 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } it.remove(); + } else { + // filter out stores that already flushed. + // This could happen if we turn on per family flush. + Map maxStoreSeqId = region.getMaxStoreSeqIdForLogReplay(); + for (Iterator storeIter = map.keySet().iterator(); storeIter.hasNext();) { + Long maxSeqId = maxStoreSeqId.get(storeIter.next()); + if (maxSeqId != null && replaySeqId <= maxSeqId.longValue()) { + storeIter.remove(); + } + } + if (map.isEmpty()) { + it.remove(); + } } } requestCount.add(mutations.size()); @@ -1074,7 +1085,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, LOG.info("Flushing " + region.getRegionNameAsString()); boolean shouldFlush = true; if (request.hasIfOlderThanTs()) { - shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs(); + shouldFlush = region.getEarliestFlushTimeForAllStores() < request.getIfOlderThanTs(); } FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder(); if (shouldFlush) { @@ -1091,7 +1102,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } builder.setFlushed(result); } - builder.setLastFlushTime(region.getLastFlushTime()); + builder.setLastFlushTime( region.getEarliestFlushTimeForAllStores()); return builder.build(); } catch (DroppedSnapshotException ex) { // Cache flush can fail in a few places. If it fails in a critical diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index ced3383..d2ba69d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; + import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -31,10 +33,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; @@ -50,7 +54,6 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -64,15 +67,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import static org.apache.hadoop.hbase.wal.DefaultWALProvider.WAL_FILE_NAME_DELIMITER; -import org.apache.hadoop.hbase.wal.DefaultWALProvider; -import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALPrettyPrinter; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.DrainBarrier; @@ -80,6 +75,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALPrettyPrinter; +import org.apache.hadoop.hbase.wal.WALProvider.Writer; +import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.util.StringUtils; import org.htrace.NullScope; @@ -88,6 +90,7 @@ import org.htrace.Trace; import org.htrace.TraceScope; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.ExceptionHandler; @@ -333,33 +336,35 @@ public class FSHLog implements WAL { // sequence id numbers are by region and unrelated to the ring buffer sequence number accounting // done above in failedSequence, highest sequence, etc. /** - * This lock ties all operations on oldestFlushingRegionSequenceIds and - * oldestFlushedRegionSequenceIds Maps with the exception of append's putIfAbsent call into - * oldestUnflushedSeqNums. We use these Maps to find out the low bound regions sequence id, or - * to find regions with old sequence ids to force flush; we are interested in old stuff not the - * new additions (TODO: IS THIS SAFE? CHECK!). + * This lock ties all operations on lowestFlushingStoreSequenceIds and + * oldestUnflushedStoreSequenceIds Maps with the exception of append's putIfAbsent call into + * oldestUnflushedStoreSequenceIds. We use these Maps to find out the low bound regions + * sequence id, or to find regions with old sequence ids to force flush; we are interested in + * old stuff not the new additions (TODO: IS THIS SAFE? CHECK!). */ private final Object regionSequenceIdLock = new Object(); /** - * Map of encoded region names to their OLDEST -- i.e. their first, the longest-lived -- - * sequence id in memstore. Note that this sequence id is the region sequence id. This is not - * related to the id we use above for {@link #highestSyncedSequence} and - * {@link #highestUnsyncedSequence} which is the sequence from the disruptor ring buffer. + * Map of encoded region names and family names to their OLDEST -- i.e. their first, + * the longest-lived -- sequence id in memstore. Note that this sequence id is the region + * sequence id. This is not related to the id we use above for {@link #highestSyncedSequence} + * and {@link #highestUnsyncedSequence} which is the sequence from the disruptor + * ring buffer. */ - private final ConcurrentSkipListMap oldestUnflushedRegionSequenceIds = - new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + private final ConcurrentMap> oldestUnflushedStoreSequenceIds + = new ConcurrentSkipListMap>( + Bytes.BYTES_COMPARATOR); /** - * Map of encoded region names to their lowest or OLDEST sequence/edit id in memstore currently - * being flushed out to hfiles. Entries are moved here from - * {@link #oldestUnflushedRegionSequenceIds} while the lock {@link #regionSequenceIdLock} is held + * Map of encoded region names and family names to their lowest or OLDEST sequence/edit id in + * memstore currently being flushed out to hfiles. Entries are moved here from + * {@link #oldestUnflushedStoreSequenceIds} while the lock {@link #regionSequenceIdLock} is held * (so movement between the Maps is atomic). This is not related to the id we use above for * {@link #highestSyncedSequence} and {@link #highestUnsyncedSequence} which is the sequence from * the disruptor ring buffer, an internal detail. */ - private final Map lowestFlushingRegionSequenceIds = - new TreeMap(Bytes.BYTES_COMPARATOR); + private final Map> lowestFlushingStoreSequenceIds = + new TreeMap>(Bytes.BYTES_COMPARATOR); /** * Map of region encoded names to the latest region sequence id. Updated on each append of @@ -734,6 +739,28 @@ public class FSHLog implements WAL { return DefaultWALProvider.createWriter(conf, fs, path, false); } + private long getLowestSeqId(Map seqIdMap) { + long result = HConstants.NO_SEQNUM; + for (Long seqNum: seqIdMap.values()) { + if (result == HConstants.NO_SEQNUM || seqNum.longValue() < result) { + result = seqNum.longValue(); + } + } + return result; + } + + private > Map copyMapWithLowestSeqId( + Map mapToCopy) { + Map copied = Maps.newHashMap(); + for (Map.Entry entry: mapToCopy.entrySet()) { + long lowestSeqId = getLowestSeqId(entry.getValue()); + if (lowestSeqId != HConstants.NO_SEQNUM) { + copied.put(entry.getKey(), lowestSeqId); + } + } + return copied; + } + /** * Archive old logs that could be archived: a log is eligible for archiving if all its WALEdits * have been flushed to hfiles. @@ -746,22 +773,23 @@ public class FSHLog implements WAL { * @throws IOException */ private void cleanOldLogs() throws IOException { - Map oldestFlushingSeqNumsLocal = null; - Map oldestUnflushedSeqNumsLocal = null; + Map lowestFlushingRegionSequenceIdsLocal = null; + Map oldestUnflushedRegionSequenceIdsLocal = null; List logsToArchive = new ArrayList(); // make a local copy so as to avoid locking when we iterate over these maps. synchronized (regionSequenceIdLock) { - oldestFlushingSeqNumsLocal = new HashMap(this.lowestFlushingRegionSequenceIds); - oldestUnflushedSeqNumsLocal = - new HashMap(this.oldestUnflushedRegionSequenceIds); + lowestFlushingRegionSequenceIdsLocal = + copyMapWithLowestSeqId(this.lowestFlushingStoreSequenceIds); + oldestUnflushedRegionSequenceIdsLocal = + copyMapWithLowestSeqId(this.oldestUnflushedStoreSequenceIds); } for (Map.Entry> e : byWalRegionSequenceIds.entrySet()) { // iterate over the log file. Path log = e.getKey(); Map sequenceNums = e.getValue(); // iterate over the map for this log file, and tell whether it should be archive or not. - if (areAllRegionsFlushed(sequenceNums, oldestFlushingSeqNumsLocal, - oldestUnflushedSeqNumsLocal)) { + if (areAllRegionsFlushed(sequenceNums, lowestFlushingRegionSequenceIdsLocal, + oldestUnflushedRegionSequenceIdsLocal)) { logsToArchive.add(log); LOG.debug("WAL file ready for archiving " + log); } @@ -815,10 +843,11 @@ public class FSHLog implements WAL { List regionsToFlush = null; // Keeping the old behavior of iterating unflushedSeqNums under oldestSeqNumsLock. synchronized (regionSequenceIdLock) { - for (Map.Entry e : regionsSequenceNums.entrySet()) { - Long unFlushedVal = this.oldestUnflushedRegionSequenceIds.get(e.getKey()); - if (unFlushedVal != null && unFlushedVal <= e.getValue()) { - if (regionsToFlush == null) regionsToFlush = new ArrayList(); + for (Map.Entry e: regionsSequenceNums.entrySet()) { + long unFlushedVal = getEarliestMemstoreSeqNum(e.getKey()); + if (unFlushedVal != HConstants.NO_SEQNUM && unFlushedVal <= e.getValue()) { + if (regionsToFlush == null) + regionsToFlush = new ArrayList(); regionsToFlush.add(e.getKey()); } } @@ -1584,36 +1613,53 @@ public class FSHLog implements WAL { // +1 for current use log return getNumRolledLogFiles() + 1; } - + // public only until class moves to o.a.h.h.wal /** @return the size of log files in use */ public long getLogFileSize() { return this.totalLogSize.get(); } - + @Override - public boolean startCacheFlush(final byte[] encodedRegionName) { - Long oldRegionSeqNum = null; + public boolean startCacheFlush(final byte[] encodedRegionName, + Set flushedFamilyNames) { + Map oldStoreSeqNum = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); if (!closeBarrier.beginOp()) { LOG.info("Flush will not be started for " + Bytes.toString(encodedRegionName) + " - because the server is closing."); return false; } synchronized (regionSequenceIdLock) { - oldRegionSeqNum = this.oldestUnflushedRegionSequenceIds.remove(encodedRegionName); - if (oldRegionSeqNum != null) { - Long oldValue = - this.lowestFlushingRegionSequenceIds.put(encodedRegionName, oldRegionSeqNum); - assert oldValue == - null : "Flushing map not cleaned up for " + Bytes.toString(encodedRegionName); + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + for (byte[] familyName: flushedFamilyNames) { + Long seqId = oldestUnflushedStoreSequenceIdsOfRegion.remove(familyName); + if (seqId != null) { + oldStoreSeqNum.put(familyName, seqId); + } + } + if (!oldStoreSeqNum.isEmpty()) { + Map oldValue = this.lowestFlushingStoreSequenceIds.put( + encodedRegionName, oldStoreSeqNum); + assert oldValue == null: "Flushing map not cleaned up for " + + Bytes.toString(encodedRegionName); + } + if (oldestUnflushedStoreSequenceIdsOfRegion.isEmpty()) { + // Remove it otherwise it will be in oldestUnflushedStoreSequenceIds for ever + // even if the region is already moved to other server. + // Do not worry about data racing, we held write lock of region when calling + // startCacheFlush, so no one can add value to the map we removed. + oldestUnflushedStoreSequenceIds.remove(encodedRegionName); + } } } - if (oldRegionSeqNum == null) { - // TODO: if we have no oldRegionSeqNum, and WAL is not disabled, presumably either - // the region is already flushing (which would make this call invalid), or there - // were no appends after last flush, so why are we starting flush? Maybe we should - // assert not null, and switch to "long" everywhere. Less rigorous, but safer, - // alternative is telling the caller to stop. For now preserve old logic. + if (oldStoreSeqNum.isEmpty()) { + // TODO: if we have no oldStoreSeqNum, and WAL is not disabled, presumably either + // the region is already flushing (which would make this call invalid), or there + // were no appends after last flush, so why are we starting flush? Maybe we should + // assert not empty. Less rigorous, but safer, alternative is telling the caller to stop. + // For now preserve old logic. LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } @@ -1623,30 +1669,59 @@ public class FSHLog implements WAL { @Override public void completeCacheFlush(final byte [] encodedRegionName) { synchronized (regionSequenceIdLock) { - this.lowestFlushingRegionSequenceIds.remove(encodedRegionName); + this.lowestFlushingStoreSequenceIds.remove(encodedRegionName); } closeBarrier.endOp(); } + private ConcurrentMap getOrCreateOldestUnflushedStoreSequenceIdsOfRegion( + byte[] encodedRegionName) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + return oldestUnflushedStoreSequenceIdsOfRegion; + } + oldestUnflushedStoreSequenceIdsOfRegion = + new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR); + ConcurrentMap alreadyPut = + oldestUnflushedStoreSequenceIds.put(encodedRegionName, + oldestUnflushedStoreSequenceIdsOfRegion); + return alreadyPut == null ? oldestUnflushedStoreSequenceIdsOfRegion : alreadyPut; + } + @Override public void abortCacheFlush(byte[] encodedRegionName) { - Long currentSeqNum = null, seqNumBeforeFlushStarts = null; + Map storeSeqNumsBeforeFlushStarts; + Map currentStoreSeqNums = new TreeMap(Bytes.BYTES_COMPARATOR); synchronized (regionSequenceIdLock) { - seqNumBeforeFlushStarts = this.lowestFlushingRegionSequenceIds.remove(encodedRegionName); - if (seqNumBeforeFlushStarts != null) { - currentSeqNum = - this.oldestUnflushedRegionSequenceIds.put(encodedRegionName, seqNumBeforeFlushStarts); + storeSeqNumsBeforeFlushStarts = this.lowestFlushingStoreSequenceIds.remove( + encodedRegionName); + if (storeSeqNumsBeforeFlushStarts != null) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); + for (Map.Entry familyNameAndSeqId: storeSeqNumsBeforeFlushStarts + .entrySet()) { + currentStoreSeqNums.put(familyNameAndSeqId.getKey(), + oldestUnflushedStoreSequenceIdsOfRegion.put(familyNameAndSeqId.getKey(), + familyNameAndSeqId.getValue())); + } } } closeBarrier.endOp(); - if ((currentSeqNum != null) - && (currentSeqNum.longValue() <= seqNumBeforeFlushStarts.longValue())) { - String errorStr = "Region " + Bytes.toString(encodedRegionName) + - "acquired edits out of order current memstore seq=" + currentSeqNum - + ", previous oldest unflushed id=" + seqNumBeforeFlushStarts; - LOG.error(errorStr); - assert false : errorStr; - Runtime.getRuntime().halt(1); + if (storeSeqNumsBeforeFlushStarts != null) { + for (Map.Entry familyNameAndSeqId : storeSeqNumsBeforeFlushStarts.entrySet()) { + Long currentSeqNum = currentStoreSeqNums.get(familyNameAndSeqId.getKey()); + if (currentSeqNum != null + && currentSeqNum.longValue() <= familyNameAndSeqId.getValue().longValue()) { + String errorStr = + "Region " + Bytes.toString(encodedRegionName) + " family " + + Bytes.toString(familyNameAndSeqId.getKey()) + + " acquired edits out of order current memstore seq=" + currentSeqNum + + ", previous oldest unflushed id=" + familyNameAndSeqId.getValue(); + LOG.error(errorStr); + Runtime.getRuntime().halt(1); + } + } } } @@ -1677,8 +1752,23 @@ public class FSHLog implements WAL { @Override public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { - Long result = oldestUnflushedRegionSequenceIds.get(encodedRegionName); - return result == null ? HConstants.NO_SEQNUM : result.longValue(); + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); + return oldestUnflushedStoreSequenceIdsOfRegion != null ? + getLowestSeqId(oldestUnflushedStoreSequenceIdsOfRegion) : HConstants.NO_SEQNUM; + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, + byte[] familyName) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + this.oldestUnflushedStoreSequenceIds.get(encodedRegionName); + if (oldestUnflushedStoreSequenceIdsOfRegion != null) { + Long result = oldestUnflushedStoreSequenceIdsOfRegion.get(familyName); + return result != null ? result.longValue() : HConstants.NO_SEQNUM; + } else { + return HConstants.NO_SEQNUM; + } } /** @@ -1914,6 +2004,15 @@ public class FSHLog implements WAL { } } + private void updateOldestUnflushedSequenceIds(byte[] encodedRegionName, + Set familyNameSet, Long lRegionSequenceId) { + ConcurrentMap oldestUnflushedStoreSequenceIdsOfRegion = + getOrCreateOldestUnflushedStoreSequenceIdsOfRegion(encodedRegionName); + for (byte[] familyName : familyNameSet) { + oldestUnflushedStoreSequenceIdsOfRegion.putIfAbsent(familyName, lRegionSequenceId); + } + } + /** * Append to the WAL. Does all CP and WAL listener calls. * @param entry @@ -1961,9 +2060,10 @@ public class FSHLog implements WAL { Long lRegionSequenceId = Long.valueOf(regionSequenceId); highestRegionSequenceIds.put(encodedRegionName, lRegionSequenceId); if (entry.isInMemstore()) { - oldestUnflushedRegionSequenceIds.putIfAbsent(encodedRegionName, lRegionSequenceId); + updateOldestUnflushedSequenceIds(encodedRegionName, + entry.getFamilyNames(), lRegionSequenceId); } - + coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); // Update metrics. postAppend(entry, EnvironmentEdgeManager.currentTime() - start); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index d9942b3..4430922 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -19,13 +19,21 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.classification.InterfaceAudience; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; + +import com.google.common.collect.Sets; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALKey; @@ -96,7 +104,7 @@ class FSWALEntry extends Entry { */ long stampRegionSequenceId() throws IOException { long regionSequenceId = this.regionSequenceIdReference.incrementAndGet(); - if (!this.getEdit().isReplay() && memstoreCells != null && !memstoreCells.isEmpty()) { + if (!this.getEdit().isReplay() && !CollectionUtils.isEmpty(memstoreCells)) { for (Cell cell : this.memstoreCells) { CellUtil.setSequenceId(cell, regionSequenceId); } @@ -105,4 +113,19 @@ class FSWALEntry extends Entry { key.setLogSeqNum(regionSequenceId); return regionSequenceId; } + + /** + * @return the family names which are effected by this edit. + */ + Set getFamilyNames() { + ArrayList cells = this.getEdit().getCells(); + if (CollectionUtils.isEmpty(cells)) { + return Collections.emptySet(); + } + Set familySet = Sets.newTreeSet(Bytes.BYTES_COMPARATOR); + for (Cell cell: cells) { + familySet.add(CellUtil.cloneFamily(cell)); + } + return familySet; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index e0fc35c..f571166 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -182,7 +183,7 @@ class DisabledWALProvider implements WALProvider { } @Override - public boolean startCacheFlush(final byte[] encodedRegionName) { + public boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames) { return !(closed.get()); } @@ -205,6 +206,11 @@ class DisabledWALProvider implements WALProvider { } @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + return HConstants.NO_SEQNUM; + } + + @Override public String toString() { return "WAL disabled."; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 787a34b..91c021a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.wal; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -149,7 +150,7 @@ public interface WAL { * @return true if the flush can proceed, false in case wal is closing (ususally, when server is * closing) and flush couldn't be started. */ - boolean startCacheFlush(final byte[] encodedRegionName); + boolean startCacheFlush(final byte[] encodedRegionName, Set flushedFamilyNames); /** * Complete the cache flush. @@ -179,6 +180,14 @@ public interface WAL { long getEarliestMemstoreSeqNum(byte[] encodedRegionName); /** + * Gets the earliest sequence number in the memstore for this particular region and store. + * @param encodedRegionName The region to get the number for. + * @param familyName The family to get the number for. + * @return The number if present, HConstants.NO_SEQNUM if absent. + */ + long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName); + + /** * Human readable identifying information about the state of this WAL. * Implementors are encouraged to include information appropriate for debugging. * Consumers are advised not to rely on the details of the returned String; it does diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index a9493c7..777ecea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -268,7 +268,7 @@ public class TestIOFencing { compactingRegion = (CompactionBlockerRegion)testRegions.get(0); LOG.info("Blocking compactions"); compactingRegion.stopCompactions(); - long lastFlushTime = compactingRegion.getLastFlushTime(); + long lastFlushTime = compactingRegion.getEarliestFlushTimeForAllStores(); // Load some rows TEST_UTIL.loadNumericRows(table, FAMILY, 0, FIRST_BATCH_COUNT); @@ -284,7 +284,7 @@ public class TestIOFencing { // Wait till flush has happened, otherwise there won't be multiple store files long startWaitTime = System.currentTimeMillis(); - while (compactingRegion.getLastFlushTime() <= lastFlushTime || + while (compactingRegion.getEarliestFlushTimeForAllStores() <= lastFlushTime || compactingRegion.countStoreFiles() <= 1) { LOG.info("Waiting for the region to flush " + compactingRegion.getRegionNameAsString()); Thread.sleep(1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java index ace24b1..676885b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java @@ -34,8 +34,8 @@ public class TestFlushRegionEntry { @Test public void test() { - FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class)); - FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class)); + FlushRegionEntry entry = new FlushRegionEntry(Mockito.mock(HRegion.class), true); + FlushRegionEntry other = new FlushRegionEntry(Mockito.mock(HRegion.class), true); assertEquals(entry.hashCode(), other.hashCode()); assertEquals(entry, other); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 91de97c..c1eeea0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -113,11 +113,11 @@ public class TestHeapMemoryManager { long oldBlockCacheSize = blockCache.maxSize; heapMemoryManager.start(); memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); memStoreFlusher.flushType = FlushType.ABOVE_LOWER_MARK; - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); Thread.sleep(1500); // Allow the tuner to run once and do necessary memory up assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -127,8 +127,8 @@ public class TestHeapMemoryManager { oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_HIGHER_MARK; - memStoreFlusher.requestFlush(null); - memStoreFlusher.requestFlush(null); + memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false); Thread.sleep(1500); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_STEP_VALUE, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -408,12 +408,12 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean forceFlushAllStores) { this.listener.flushRequested(flushType, region); } @Override - public void requestDelayedFlush(HRegion region, long delay) { + public void requestDelayedFlush(HRegion region, long delay, boolean forceFlushAllStores) { } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java new file mode 100644 index 0000000..c7050e1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPerColumnFamilyFlush.java @@ -0,0 +1,638 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy; +import org.apache.hadoop.hbase.regionserver.FlushLargeStoresPolicy; +import org.apache.hadoop.hbase.regionserver.FlushPolicy; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.wal.FSHLog; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import com.google.common.hash.Hashing; + +/** + * This test verifies the correctness of the Per Column Family flushing strategy + */ +@Category(MediumTests.class) +public class TestPerColumnFamilyFlush { + private static final Log LOG = LogFactory.getLog(TestPerColumnFamilyFlush.class); + + HRegion region = null; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static final Path DIR = TEST_UTIL.getDataTestDir("TestHRegion"); + + public static final TableName TABLENAME = TableName.valueOf("TestPerColumnFamilyFlush", "t1"); + + public static final byte[][] families = { Bytes.toBytes("f1"), Bytes.toBytes("f2"), + Bytes.toBytes("f3"), Bytes.toBytes("f4"), Bytes.toBytes("f5") }; + + public static final byte[] FAMILY1 = families[0]; + + public static final byte[] FAMILY2 = families[1]; + + public static final byte[] FAMILY3 = families[2]; + + private void initHRegion(String callingMethod, Configuration conf) throws IOException { + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + for (byte[] family : families) { + htd.addFamily(new HColumnDescriptor(family)); + } + HRegionInfo info = new HRegionInfo(TABLENAME, null, null, false); + Path path = new Path(DIR, callingMethod); + region = HRegion.createHRegion(info, path, conf, htd); + } + + // A helper function to create puts. + private Put createPut(int familyNum, int putNum) { + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + Put p = new Put(row); + p.add(families[familyNum - 1], qf, val); + return p; + } + + // A helper function to create puts. + private Get createGet(int familyNum, int putNum) { + byte[] row = Bytes.toBytes("row" + familyNum + "-" + putNum); + return new Get(row); + } + + // A helper function to verify edits. + void verifyEdit(int familyNum, int putNum, HTable table) throws IOException { + Result r = table.get(createGet(familyNum, putNum)); + byte[] family = families[familyNum - 1]; + byte[] qf = Bytes.toBytes("q" + familyNum); + byte[] val = Bytes.toBytes("val" + familyNum + "-" + putNum); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), r.getFamilyMap(family)); + assertNotNull(("Missing Put#" + putNum + " for CF# " + familyNum), + r.getFamilyMap(family).get(qf)); + assertTrue(("Incorrect value for Put#" + putNum + " for CF# " + familyNum), + Arrays.equals(r.getFamilyMap(family).get(qf), val)); + } + + @Test + public void testSelectiveFlushWhenEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100 * 1024); + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the smallest LSNs for edits wrt to each CF. + long smallestSeqCF1 = region.getOldestSeqIdOfStore(FAMILY1); + long smallestSeqCF2 = region.getOldestSeqIdOfStore(FAMILY2); + long smallestSeqCF3 = region.getOldestSeqIdOfStore(FAMILY3); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Get the overall smallest LSN in the region's memstores. + long smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // The overall smallest LSN in the region's memstores should be the same as + // the LSN of the smallest edit in CF1 + assertEquals(smallestSeqCF1, smallestSeqInRegionCurrentMemstore); + + // Some other sanity checks. + assertTrue(smallestSeqCF1 < smallestSeqCF2); + assertTrue(smallestSeqCF2 < smallestSeqCF3); + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize + + cf2MemstoreSize + cf3MemstoreSize); + + // Flush! + region.flushcache(false); + + // Will use these to check if anything changed. + long oldCF2MemstoreSize = cf2MemstoreSize; + long oldCF3MemstoreSize = cf3MemstoreSize; + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // We should have cleared out only CF1, since we chose the flush thresholds + // and number of puts accordingly. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // Nothing should have happened to CF2, ... + assertEquals(cf2MemstoreSize, oldCF2MemstoreSize); + // ... or CF3 + assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); + // Now the smallest LSN in the region should be the same as the smallest + // LSN in the memstore of CF2. + assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF2); + // Of course, this should hold too. + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); + + // Now add more puts (mostly for CF2), so that we only flush CF2 this time. + for (int i = 1200; i < 2400; i++) { + region.put(createPut(2, i)); + + // Add only 100 puts for CF3 + if (i - 1200 < 100) { + region.put(createPut(3, i)); + } + } + + // How much does the CF3 memstore occupy? Will be used later. + oldCF3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Flush again + region.flushcache(false); + + // Recalculate everything + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // CF1 and CF2, both should be absent. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); + // CF3 shouldn't have been touched. + assertEquals(cf3MemstoreSize, oldCF3MemstoreSize); + assertEquals(totalMemstoreSize + DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); + assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3); + + // What happens when we hit the memstore limit, but we are not able to find + // any Column Family above the threshold? + // In that case, we should flush all the CFs. + + // Clearing the existing memstores. + region.flushcache(true); + + // The memstore limit is 200*1024 and the column family flush threshold is + // around 50*1024. We try to just hit the memstore limit with each CF's + // memstore being below the CF flush threshold. + for (int i = 1; i <= 300; i++) { + region.put(createPut(1, i)); + region.put(createPut(2, i)); + region.put(createPut(3, i)); + region.put(createPut(4, i)); + region.put(createPut(5, i)); + } + + region.flushcache(false); + // Since we won't find any CF above the threshold, and hence no specific + // store to flush, we should flush all the memstores. + assertEquals(0, region.getMemstoreSize().get()); + } + + @Test + public void testSelectiveFlushWhenNotEnabled() throws IOException { + // Set up the configuration + Configuration conf = HBaseConfiguration.create(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 200 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); + + // Intialize the HRegion + initHRegion("testSelectiveFlushWhenNotEnabled", conf); + // Add 1200 entries for CF1, 100 for CF2 and 50 for CF3 + for (int i = 1; i <= 1200; i++) { + region.put(createPut(1, i)); + if (i <= 100) { + region.put(createPut(2, i)); + if (i <= 50) { + region.put(createPut(3, i)); + } + } + } + + long totalMemstoreSize = region.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + long cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + long cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + long cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + + // Some other sanity checks. + assertTrue(cf1MemstoreSize > 0); + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + + // The total memstore size should be the same as the sum of the sizes of + // memstores of CF1, CF2 and CF3. + assertEquals(totalMemstoreSize + 3 * DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize + + cf2MemstoreSize + cf3MemstoreSize); + + // Flush! + region.flushcache(false); + + cf1MemstoreSize = region.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize(); + totalMemstoreSize = region.getMemstoreSize().get(); + long smallestSeqInRegionCurrentMemstore = + region.getWAL().getEarliestMemstoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); + + // Everything should have been cleared + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf3MemstoreSize); + assertEquals(0, totalMemstoreSize); + assertEquals(HConstants.NO_SEQNUM, smallestSeqInRegionCurrentMemstore); + } + + // Find the (first) region which has the specified name. + private static Pair getRegionWithName(TableName tableName) { + MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster(); + List rsts = cluster.getRegionServerThreads(); + for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) { + HRegionServer hrs = rsts.get(i).getRegionServer(); + for (HRegion region : hrs.getOnlineRegions(tableName)) { + return Pair.newPair(region, hrs); + } + } + return null; + } + + @Test + public void testLogReplay() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 20000); + // Carefully chosen limits so that the memstore just flushes when we're done + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 10000); + final int numRegionServers = 4; + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte[] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + // Add 100 edits for CF1, 20 for CF2, 20 for CF3. + // These will all be interleaved in the log. + for (int i = 1; i <= 80; i++) { + table.put(createPut(1, i)); + if (i <= 10) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + } + table.flushCommits(); + Thread.sleep(1000); + + Pair desiredRegionAndServer = getRegionWithName(TABLENAME); + HRegion desiredRegion = desiredRegionAndServer.getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + + // Flush the region selectively. + desiredRegion.flushcache(false); + + long totalMemstoreSize; + long cf1MemstoreSize, cf2MemstoreSize, cf3MemstoreSize; + totalMemstoreSize = desiredRegion.getMemstoreSize().get(); + + // Find the sizes of the memstores of each CF. + cf1MemstoreSize = desiredRegion.getStore(FAMILY1).getMemStoreSize(); + cf2MemstoreSize = desiredRegion.getStore(FAMILY2).getMemStoreSize(); + cf3MemstoreSize = desiredRegion.getStore(FAMILY3).getMemStoreSize(); + + // CF1 Should have been flushed + assertEquals(DefaultMemStore.DEEP_OVERHEAD, cf1MemstoreSize); + // CF2 and CF3 shouldn't have been flushed. + assertTrue(cf2MemstoreSize > 0); + assertTrue(cf3MemstoreSize > 0); + assertEquals(totalMemstoreSize + 2 * DefaultMemStore.DEEP_OVERHEAD, cf2MemstoreSize + + cf3MemstoreSize); + + // Wait for the RS report to go across to the master, so that the master + // is aware of which sequence ids have been flushed, before we kill the RS. + // If in production, the RS dies before the report goes across, we will + // safely replay all the edits. + Thread.sleep(2000); + + // Abort the region server where we have the region hosted. + HRegionServer rs = desiredRegionAndServer.getSecond(); + rs.abort("testing"); + + // The aborted region server's regions will be eventually assigned to some + // other region server, and the get RPC call (inside verifyEdit()) will + // retry for some time till the regions come back up. + + // Verify that all the edits are safe. + for (int i = 1; i <= 80; i++) { + verifyEdit(1, i, table); + if (i <= 10) { + verifyEdit(2, i, table); + verifyEdit(3, i, table); + } + } + + TEST_UTIL.shutdownMiniCluster(); + } + + // Test Log Replay with Distributed Replay on. + // In distributed log replay, the log splitters ask the master for the + // last flushed sequence id for a region. This test would ensure that we + // are doing the book-keeping correctly. + @Test + public void testLogReplayWithDistributedReplay() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); + testLogReplay(); + } + + /** + * When a log roll is about to happen, we do a flush of the regions who will be affected by the + * log roll. These flushes cannot be a selective flushes, otherwise we cannot roll the logs. This + * test ensures that we do a full-flush in that scenario. + * @throws IOException + */ + @Test + public void testFlushingWhenLogRolling() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 300000); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 100000); + + // Also, let us try real hard to get a log roll to happen. + // Keeping the log roll period to 2s. + conf.setLong("hbase.regionserver.logroll.period", 2000); + // Keep the block size small so that we fill up the log files very fast. + conf.setLong("hbase.regionserver.hlog.blocksize", 6144); + int maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); + + final int numRegionServers = 4; + TEST_UTIL.startMiniCluster(numRegionServers); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + HTable table = TEST_UTIL.createTable(TABLENAME, families); + HTableDescriptor htd = table.getTableDescriptor(); + + for (byte[] family : families) { + if (!htd.hasFamily(family)) { + htd.addFamily(new HColumnDescriptor(family)); + } + } + + HRegion desiredRegion = getRegionWithName(TABLENAME).getFirst(); + assertTrue("Could not find a region which hosts the new region.", desiredRegion != null); + + // Add some edits. Most will be for CF1, some for CF2 and CF3. + for (int i = 1; i <= 10000; i++) { + table.put(createPut(1, i)); + if (i <= 200) { + table.put(createPut(2, i)); + table.put(createPut(3, i)); + } + table.flushCommits(); + // Keep adding until we exceed the number of log files, so that we are + // able to trigger the cleaning of old log files. + int currentNumLogFiles = ((FSHLog) (desiredRegion.getWAL())).getNumLogFiles(); + if (currentNumLogFiles > maxLogs) { + LOG.info("The number of log files is now: " + currentNumLogFiles + + ". Expect a log roll and memstore flush."); + break; + } + } + table.close(); + // Wait for some time till the flush caused by log rolling happens. + Thread.sleep(4000); + + // We have artificially created the conditions for a log roll. When a + // log roll happens, we should flush all the column families. Testing that + // case here. + + // Individual families should have been flushed. + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY1).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY2).getMemStoreSize()); + assertEquals(DefaultMemStore.DEEP_OVERHEAD, desiredRegion.getStore(FAMILY3).getMemStoreSize()); + + // And of course, the total memstore should also be clean. + assertEquals(0, desiredRegion.getMemstoreSize().get()); + + TEST_UTIL.shutdownMiniCluster(); + } + + private void doPut(HTableInterface table) throws IOException { + // cf1 4B per row, cf2 40B per row and cf3 400B per row + byte[] qf = Bytes.toBytes("qf"); + Random rand = new Random(); + byte[] value1 = new byte[100]; + byte[] value2 = new byte[200]; + byte[] value3 = new byte[400]; + for (int i = 0; i < 10000; i++) { + Put put = new Put(Bytes.toBytes("row-" + i)); + rand.setSeed(i); + rand.nextBytes(value1); + rand.nextBytes(value2); + rand.nextBytes(value3); + put.add(FAMILY1, qf, value1); + put.add(FAMILY2, qf, value2); + put.add(FAMILY3, qf, value3); + table.put(put); + } + } + + // Under the same write load, small stores should have less store files when + // percolumnfamilyflush enabled. + @Test + public void testCompareStoreFileCount() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushAllStoresPolicy.class.getName()); + conf.setLong(FlushLargeStoresPolicy.HREGION_COLUMNFAMILY_FLUSH_SIZE_LOWER_BOUND, 400 * 1024); + conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000); + conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY, + ConstantSizeRegionSplitPolicy.class.getName()); + + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.setCompactionEnabled(false); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + htd.addFamily(new HColumnDescriptor(FAMILY3)); + + LOG.info("==============Test with selective flush disabled==============="); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + getRegionWithName(TABLENAME).getFirst(); + HConnection conn = HConnectionManager.createConnection(conf); + HTableInterface table = conn.getTable(TABLENAME); + doPut(table); + table.close(); + conn.close(); + + HRegion region = getRegionWithName(TABLENAME).getFirst(); + int cf1StoreFileCount = region.getStore(FAMILY1).getStorefilesCount(); + int cf2StoreFileCount = region.getStore(FAMILY2).getStorefilesCount(); + int cf3StoreFileCount = region.getStore(FAMILY3).getStorefilesCount(); + TEST_UTIL.shutdownMiniCluster(); + + LOG.info("==============Test with selective flush enabled==============="); + conf.set(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushLargeStoresPolicy.class.getName()); + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.getHBaseAdmin().createNamespace( + NamespaceDescriptor.create(TABLENAME.getNamespaceAsString()).build()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + conn = HConnectionManager.createConnection(conf); + table = conn.getTable(TABLENAME); + doPut(table); + table.close(); + conn.close(); + + region = getRegionWithName(TABLENAME).getFirst(); + int cf1StoreFileCount1 = region.getStore(FAMILY1).getStorefilesCount(); + int cf2StoreFileCount1 = region.getStore(FAMILY2).getStorefilesCount(); + int cf3StoreFileCount1 = region.getStore(FAMILY3).getStorefilesCount(); + TEST_UTIL.shutdownMiniCluster(); + + LOG.info("disable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount + + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount + ", " + + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount); + LOG.info("enable selective flush: " + Bytes.toString(FAMILY1) + "=>" + cf1StoreFileCount1 + + ", " + Bytes.toString(FAMILY2) + "=>" + cf2StoreFileCount1 + ", " + + Bytes.toString(FAMILY3) + "=>" + cf3StoreFileCount1); + // small CF will have less store files. + assertTrue(cf1StoreFileCount1 < cf1StoreFileCount); + assertTrue(cf2StoreFileCount1 < cf2StoreFileCount); + } + + public static void main(String[] args) throws Exception { + int numRegions = Integer.parseInt(args[0]); + long numRows = Long.parseLong(args[1]); + + HTableDescriptor htd = new HTableDescriptor(TABLENAME); + htd.setMaxFileSize(10L * 1024 * 1024 * 1024); + htd.setValue(HTableDescriptor.SPLIT_POLICY, ConstantSizeRegionSplitPolicy.class.getName()); + htd.addFamily(new HColumnDescriptor(FAMILY1)); + htd.addFamily(new HColumnDescriptor(FAMILY2)); + htd.addFamily(new HColumnDescriptor(FAMILY3)); + + Configuration conf = HBaseConfiguration.create(); + HConnection conn = HConnectionManager.createConnection(conf); + HBaseAdmin admin = new HBaseAdmin(conn); + if (admin.tableExists(TABLENAME)) { + admin.disableTable(TABLENAME); + admin.deleteTable(TABLENAME); + } + if (numRegions >= 3) { + byte[] startKey = new byte[16]; + byte[] endKey = new byte[16]; + Arrays.fill(endKey, (byte) 0xFF); + admin.createTable(htd, startKey, endKey, numRegions); + } else { + admin.createTable(htd); + } + admin.close(); + + HTableInterface table = conn.getTable(TABLENAME); + byte[] qf = Bytes.toBytes("qf"); + Random rand = new Random(); + byte[] value1 = new byte[16]; + byte[] value2 = new byte[256]; + byte[] value3 = new byte[4096]; + for (long i = 0; i < numRows; i++) { + Put put = new Put(Hashing.md5().hashLong(i).asBytes()); + rand.setSeed(i); + rand.nextBytes(value1); + rand.nextBytes(value2); + rand.nextBytes(value3); + put.add(FAMILY1, qf, value1); + put.add(FAMILY2, qf, value2); + put.add(FAMILY3, qf, value3); + table.put(put); + if (i % 10000 == 0) { + LOG.info(i + " rows put"); + } + } + table.close(); + conn.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index 6182cca..970b0f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; @@ -43,8 +44,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.Coprocessor; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -158,18 +159,15 @@ public class TestFSHLog { } } - protected void addEdits(WAL log, HRegionInfo hri, TableName tableName, - int times, AtomicLong sequenceId) throws IOException { - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("row")); - - final byte [] row = Bytes.toBytes("row"); + protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, + AtomicLong sequenceId) throws IOException { + final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols, - sequenceId, true, null); + log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), + cols, sequenceId, true, null); } log.sync(); } @@ -179,8 +177,8 @@ public class TestFSHLog { * @param wal * @param regionEncodedName */ - protected void flushRegion(WAL wal, byte[] regionEncodedName) { - wal.startCacheFlush(regionEncodedName); + protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { + wal.startCacheFlush(regionEncodedName, flushedFamilyNames); wal.completeCacheFlush(regionEncodedName); } @@ -254,10 +252,14 @@ public class TestFSHLog { conf1.setInt("hbase.regionserver.maxlogs", 1); FSHLog wal = new FSHLog(fs, FSUtils.getRootDir(conf1), dir.toString(), HConstants.HREGION_OLDLOGDIR_NAME, conf1, null, true, null, null); - TableName t1 = TableName.valueOf("t1"); - TableName t2 = TableName.valueOf("t2"); - HRegionInfo hri1 = new HRegionInfo(t1, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(t2, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HTableDescriptor t1 = + new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); + HTableDescriptor t2 = + new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row")); + HRegionInfo hri1 = + new HRegionInfo(t1.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = + new HRegionInfo(t2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); // variables to mock region sequenceIds final AtomicLong sequenceId1 = new AtomicLong(1); final AtomicLong sequenceId2 = new AtomicLong(1); @@ -284,12 +286,12 @@ public class TestFSHLog { assertEquals(hri1.getEncodedNameAsBytes(), regionsToFlush[0]); // flush region 1, and roll the wal file. Only last wal which has entries for region1 should // remain. - flushRegion(wal, hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); wal.rollWriter(); // only one wal should remain now (that is for the second region). assertEquals(1, wal.getNumRolledLogFiles()); // flush the second region - flushRegion(wal, hri2.getEncodedNameAsBytes()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys()); wal.rollWriter(true); // no wal should remain now. assertEquals(0, wal.getNumRolledLogFiles()); @@ -306,14 +308,14 @@ public class TestFSHLog { regionsToFlush = wal.findRegionsToForceFlush(); assertEquals(2, regionsToFlush.length); // flush both regions - flushRegion(wal, hri1.getEncodedNameAsBytes()); - flushRegion(wal, hri2.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), t2.getFamiliesKeys()); wal.rollWriter(true); assertEquals(0, wal.getNumRolledLogFiles()); // Add an edit to region1, and roll the wal. addEdits(wal, hri1, t1, 2, sequenceId1); // tests partial flush: roll on a partial flush, and ensure that wal is not archived. - wal.startCacheFlush(hri1.getEncodedNameAsBytes()); + wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getFamiliesKeys()); wal.rollWriter(); wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); assertEquals(1, wal.getNumRolledLogFiles()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index f3651ae..dbb6b07 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -27,7 +27,10 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -786,13 +789,15 @@ public class TestWALReplay { // Add 1k to each family. final int countPerFamily = 1000; + Set familyNames = new HashSet(); for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, wal, htd, sequenceId); + familyNames.add(hcd.getName()); } // Add a cache flush, shouldn't have any effect - wal.startCacheFlush(regionName); + wal.startCacheFlush(regionName, familyNames); wal.completeCacheFlush(regionName); // Add an edit to another family, should be skipped. @@ -832,11 +837,11 @@ public class TestWALReplay { final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { @Override - protected FlushResult internalFlushcache( - final WAL wal, final long myseqid, MonitoredTask status) + protected FlushResult internalFlushcache(final WAL wal, final long myseqid, + Collection storesToFlush, MonitoredTask status) throws IOException { LOG.info("InternalFlushCache Invoked"); - FlushResult fs = super.internalFlushcache(wal, myseqid, + FlushResult fs = super.internalFlushcache(wal, myseqid, storesToFlush, Mockito.mock(MonitoredTask.class)); flushcount.incrementAndGet(); return fs; @@ -950,16 +955,16 @@ public class TestWALReplay { private HRegion r; @Override - public void requestFlush(HRegion region) { + public void requestFlush(HRegion region, boolean forceFlushAllStores) { try { - r.flushcache(); + r.flushcache(forceFlushAllStores); } catch (IOException e) { throw new RuntimeException("Exception flushing", e); } } @Override - public void requestDelayedFlush(HRegion region, long when) { + public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { // TODO Auto-generated method stub } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java index 9798b6c..c4b60c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestDefaultWALProvider.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import java.io.IOException; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -142,18 +143,15 @@ public class TestDefaultWALProvider { } - protected void addEdits(WAL log, HRegionInfo hri, TableName tableName, + protected void addEdits(WAL log, HRegionInfo hri, HTableDescriptor htd, int times, AtomicLong sequenceId) throws IOException { - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("row")); - - final byte [] row = Bytes.toBytes("row"); + final byte[] row = Bytes.toBytes("row"); for (int i = 0; i < times; i++) { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), tableName, timestamp), cols, - sequenceId, true, null); + log.append(htd, hri, getWalKey(hri.getEncodedNameAsBytes(), htd.getTableName(), timestamp), + cols, sequenceId, true, null); } log.sync(); } @@ -170,8 +168,8 @@ public class TestDefaultWALProvider { * @param wal * @param regionEncodedName */ - protected void flushRegion(WAL wal, byte[] regionEncodedName) { - wal.startCacheFlush(regionEncodedName); + protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { + wal.startCacheFlush(regionEncodedName, flushedFamilyNames); wal.completeCacheFlush(regionEncodedName); } @@ -180,45 +178,47 @@ public class TestDefaultWALProvider { @Test public void testLogCleaning() throws Exception { LOG.info("testLogCleaning"); - final TableName tableName = - TableName.valueOf("testLogCleaning"); - final TableName tableName2 = - TableName.valueOf("testLogCleaning2"); + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("testLogCleaning")).addFamily(new HColumnDescriptor( + "row")); + final HTableDescriptor htd2 = + new HTableDescriptor(TableName.valueOf("testLogCleaning2")) + .addFamily(new HColumnDescriptor("row")); final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); final AtomicLong sequenceId = new AtomicLong(1); try { - HRegionInfo hri = new HRegionInfo(tableName, + HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(tableName2, + HRegionInfo hri2 = new HRegionInfo(htd2.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); // we want to mix edits from regions, so pick our own identifier. final WAL log = wals.getWAL(UNSPECIFIED_REGION); // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, tableName, 1, sequenceId); + addEdits(log, hri, htd, 1, sequenceId); log.rollWriter(); assertEquals(1, DefaultWALProvider.getNumRolledLogFiles(log)); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, tableName, 2, sequenceId); + addEdits(log, hri, htd, 2, sequenceId); log.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, tableName, 1, sequenceId); - addEdits(log, hri2, tableName2, 1, sequenceId); - addEdits(log, hri, tableName, 1, sequenceId); - addEdits(log, hri2, tableName2, 1, sequenceId); + addEdits(log, hri, htd, 1, sequenceId); + addEdits(log, hri2, htd2, 1, sequenceId); + addEdits(log, hri, htd, 1, sequenceId); + addEdits(log, hri2, htd2, 1, sequenceId); log.rollWriter(); assertEquals(3, DefaultWALProvider.getNumRolledLogFiles(log)); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + addEdits(log, hri2, htd2, 1, sequenceId); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(log)); @@ -226,8 +226,8 @@ public class TestDefaultWALProvider { // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, tableName2, 1, sequenceId); - log.startCacheFlush(hri2.getEncodedNameAsBytes()); + addEdits(log, hri2, htd2, 1, sequenceId); + log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getFamiliesKeys()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); log.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(log)); @@ -250,21 +250,25 @@ public class TestDefaultWALProvider { *

* @throws IOException */ - @Test + @Test public void testWALArchiving() throws IOException { LOG.debug("testWALArchiving"); - TableName table1 = TableName.valueOf("t1"); - TableName table2 = TableName.valueOf("t2"); + HTableDescriptor table1 = + new HTableDescriptor(TableName.valueOf("t1")).addFamily(new HColumnDescriptor("row")); + HTableDescriptor table2 = + new HTableDescriptor(TableName.valueOf("t2")).addFamily(new HColumnDescriptor("row")); final Configuration localConf = new Configuration(conf); localConf.set(WALFactory.WAL_PROVIDER, DefaultWALProvider.class.getName()); final WALFactory wals = new WALFactory(localConf, null, currentTest.getMethodName()); try { final WAL wal = wals.getWAL(UNSPECIFIED_REGION); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); - HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); - HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW, - HConstants.EMPTY_END_ROW); + HRegionInfo hri1 = + new HRegionInfo(table1.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = + new HRegionInfo(table2.getTableName(), HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); // ensure that we don't split the regions. hri1.setSplit(false); hri2.setSplit(false); @@ -283,7 +287,7 @@ public class TestDefaultWALProvider { assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add a waledit to table1, and flush the region. addEdits(wal, hri1, table1, 3, sequenceId1); - flushRegion(wal, hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table1.getFamiliesKeys()); // roll log; all old logs should be archived. wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); @@ -297,7 +301,7 @@ public class TestDefaultWALProvider { assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // add edits for table2, and flush hri1. addEdits(wal, hri2, table2, 2, sequenceId2); - flushRegion(wal, hri1.getEncodedNameAsBytes()); + flushRegion(wal, hri1.getEncodedNameAsBytes(), table2.getFamiliesKeys()); // the log : region-sequenceId map is // log1: region2 (unflushed) // log2: region1 (flushed) @@ -307,7 +311,7 @@ public class TestDefaultWALProvider { assertEquals(2, DefaultWALProvider.getNumRolledLogFiles(wal)); // flush region2, and all logs should be archived. addEdits(wal, hri2, table2, 2, sequenceId2); - flushRegion(wal, hri2.getEncodedNameAsBytes()); + flushRegion(wal, hri2.getEncodedNameAsBytes(), table2.getFamiliesKeys()); wal.rollWriter(); assertEquals(0, DefaultWALProvider.getNumRolledLogFiles(wal)); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 47b001a..72a4cca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -480,8 +480,9 @@ public class TestWALFactory { @Test public void testEditAdd() throws IOException { final int COL_COUNT = 10; - final TableName tableName = - TableName.valueOf("tablename"); + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor( + "column")); final byte [] row = Bytes.toBytes("row"); WAL.Reader reader = null; try { @@ -496,16 +497,16 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } - HRegionInfo info = new HRegionInfo(tableName, + HRegionInfo info = new HRegionInfo(htd.getTableName(), row,Bytes.toBytes(Bytes.toString(row) + "1"), false); - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("column")); final WAL log = wals.getWAL(info.getEncodedNameAsBytes()); - final long txid = log.append(htd, info, new WALKey(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + final long txid = + log.append(htd, info, + new WALKey(info.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), + cols, sequenceId, true, null); log.sync(txid); - log.startCacheFlush(info.getEncodedNameAsBytes()); + log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.shutdown(); Path filename = DefaultWALProvider.getCurrentFileName(log); @@ -519,7 +520,7 @@ public class TestWALFactory { WALKey key = entry.getKey(); WALEdit val = entry.getEdit(); assertTrue(Bytes.equals(info.getEncodedNameAsBytes(), key.getEncodedRegionName())); - assertTrue(tableName.equals(key.getTablename())); + assertTrue(htd.getTableName().equals(key.getTablename())); Cell cell = val.getCells().get(0); assertTrue(Bytes.equals(row, cell.getRow())); assertEquals((byte)(i + '0'), cell.getValue()[0]); @@ -538,8 +539,9 @@ public class TestWALFactory { @Test public void testAppend() throws IOException { final int COL_COUNT = 10; - final TableName tableName = - TableName.valueOf("tablename"); + final HTableDescriptor htd = + new HTableDescriptor(TableName.valueOf("tablename")).addFamily(new HColumnDescriptor( + "column")); final byte [] row = Bytes.toBytes("row"); WAL.Reader reader = null; final AtomicLong sequenceId = new AtomicLong(1); @@ -553,15 +555,15 @@ public class TestWALFactory { Bytes.toBytes(Integer.toString(i)), timestamp, new byte[] { (byte)(i + '0') })); } - HRegionInfo hri = new HRegionInfo(tableName, + HRegionInfo hri = new HRegionInfo(htd.getTableName(), HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); - HTableDescriptor htd = new HTableDescriptor(); - htd.addFamily(new HColumnDescriptor("column")); final WAL log = wals.getWAL(hri.getEncodedNameAsBytes()); - final long txid = log.append(htd, hri, new WALKey(hri.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis()), cols, sequenceId, true, null); + final long txid = + log.append(htd, hri, + new WALKey(hri.getEncodedNameAsBytes(), htd.getTableName(), System.currentTimeMillis()), + cols, sequenceId, true, null); log.sync(txid); - log.startCacheFlush(hri.getEncodedNameAsBytes()); + log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getFamiliesKeys()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.shutdown(); Path filename = DefaultWALProvider.getCurrentFileName(log); @@ -573,7 +575,7 @@ public class TestWALFactory { for (Cell val : entry.getEdit().getCells()) { assertTrue(Bytes.equals(hri.getEncodedNameAsBytes(), entry.getKey().getEncodedRegionName())); - assertTrue(tableName.equals(entry.getKey().getTablename())); + assertTrue(htd.getTableName().equals(entry.getKey().getTablename())); assertTrue(Bytes.equals(row, val.getRow())); assertEquals((byte)(idx + '0'), val.getValue()[0]); System.out.println(entry.getKey() + " " + val); -- 1.9.1