From 516d65ad76aa70359c6463cc2c372eb4d3fbdda5 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Wed, 13 Aug 2014 19:38:00 -0700 Subject: [PATCH] HBASE-2821 Keep young storefiles at lower replication Lower replication is defined as one replica less than default and a minimum of 1 --- .../apache/hadoop/hbase/io/hfile/HFileContext.java | 19 ++++- .../hadoop/hbase/io/hfile/HFileContextBuilder.java | 12 ++- .../hadoop/hbase/io/hfile/AbstractHFileWriter.java | 5 +- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 3 +- .../hadoop/hbase/io/hfile/HFileWriterV2.java | 3 +- .../hbase/regionserver/DefaultStoreFlusher.java | 2 +- .../hadoop/hbase/regionserver/HRegionServer.java | 12 +++ .../apache/hadoop/hbase/regionserver/HStore.java | 5 +- .../apache/hadoop/hbase/regionserver/Store.java | 5 +- .../regionserver/StorefileReplicationWatcher.java | 92 ++++++++++++++++++++++ .../hbase/regionserver/StripeStoreFlusher.java | 2 +- .../regionserver/compactions/DefaultCompactor.java | 2 +- .../regionserver/compactions/StripeCompactor.java | 2 +- .../java/org/apache/hadoop/hbase/util/FSUtils.java | 61 +++++++++++++- .../regionserver/TestCacheOnWriteInSchema.java | 2 +- .../hadoop/hbase/regionserver/TestStore.java | 3 +- .../hbase/regionserver/TestStripeCompactor.java | 2 +- .../compactions/TestStripeCompactionPolicy.java | 2 +- 18 files changed, 211 insertions(+), 23 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileReplicationWatcher.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 3299e41..89fc5dd 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -56,6 +56,8 @@ public class HFileContext implements HeapSize, Cloneable { private DataBlockEncoding encoding = DataBlockEncoding.NONE; /** Encryption algorithm and key used */ private Encryption.Context cryptoContext = Encryption.Context.NONE; + /** True if we should reduce the number of requested block replicas to one less than default */ + private boolean reducedReplicas; //Empty constructor. Go with setters public HFileContext() { @@ -76,12 +78,13 @@ public class HFileContext implements HeapSize, Cloneable { this.blocksize = context.blocksize; this.encoding = context.encoding; this.cryptoContext = context.cryptoContext; + this.reducedReplicas = context.reducedReplicas; } public HFileContext(boolean useHBaseChecksum, boolean includesMvcc, boolean includesTags, Compression.Algorithm compressAlgo, boolean compressTags, ChecksumType checksumType, int bytesPerChecksum, int blockSize, DataBlockEncoding encoding, - Encryption.Context cryptoContext) { + Encryption.Context cryptoContext, boolean reducedReplicas) { this.usesHBaseChecksum = useHBaseChecksum; this.includesMvcc = includesMvcc; this.includesTags = includesTags; @@ -94,6 +97,7 @@ public class HFileContext implements HeapSize, Cloneable { this.encoding = encoding; } this.cryptoContext = cryptoContext; + this.reducedReplicas = reducedReplicas; } public Compression.Algorithm getCompression() { @@ -160,6 +164,14 @@ public class HFileContext implements HeapSize, Cloneable { this.cryptoContext = cryptoContext; } + public boolean isReducedReplicas() { + return reducedReplicas; + } + + public void setReducedReplicas(boolean reducedReplicas) { + this.reducedReplicas = reducedReplicas; + } + /** * HeapSize implementation * NOTE : The heapsize should be altered as and when new state variable are added @@ -171,8 +183,8 @@ public class HFileContext implements HeapSize, Cloneable { // Algorithm reference, encodingon, checksumtype, Encryption.Context reference 4 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + - // usesHBaseChecksum, includesMvcc, includesTags and compressTags - 4 * Bytes.SIZEOF_BOOLEAN); + // usesHBaseChecksum, includesMvcc, includesTags, compressTags, reducedReplicas + 5 * Bytes.SIZEOF_BOOLEAN); return size; } @@ -198,6 +210,7 @@ public class HFileContext implements HeapSize, Cloneable { sb.append(" includesTags="); sb.append(includesTags); sb.append(" compressAlgo="); sb.append(compressAlgo); sb.append(" compressTags="); sb.append(compressTags); + sb.append(" reducedReplicas="); sb.append(reducedReplicas); sb.append(" cryptoContext=[ "); sb.append(cryptoContext); sb.append(" ]"); sb.append(" ]"); return sb.toString(); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java index 4f90163..75bae17 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContextBuilder.java @@ -52,7 +52,9 @@ public class HFileContextBuilder { private DataBlockEncoding encoding = DataBlockEncoding.NONE; /** Crypto context */ private Encryption.Context cryptoContext = Encryption.Context.NONE; - + /** if we are requesting reduced replication or not */ + private boolean reducedReplicas; + public HFileContextBuilder withHBaseCheckSum(boolean useHBaseCheckSum) { this.usesHBaseChecksum = useHBaseCheckSum; return this; @@ -103,8 +105,14 @@ public class HFileContextBuilder { return this; } + public HFileContextBuilder withReducedReplicas(boolean reducedReplicas) { + this.reducedReplicas = reducedReplicas; + return this; + } + public HFileContext build() { return new HFileContext(usesHBaseChecksum, includesMvcc, includesTags, compression, - compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext); + compressTags, checksumType, bytesPerChecksum, blocksize, encoding, cryptoContext, + reducedReplicas); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java index 25e53cd..576585d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileWriter.java @@ -260,9 +260,10 @@ public abstract class AbstractHFileWriter implements HFile.Writer { /** A helper method to create HFile output streams in constructors */ protected static FSDataOutputStream createOutputStream(Configuration conf, - FileSystem fs, Path path, InetSocketAddress[] favoredNodes) throws IOException { + FileSystem fs, Path path, InetSocketAddress[] favoredNodes, boolean reducedReplicas) + throws IOException { FsPermission perms = FSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY); - return FSUtils.create(fs, path, perms, favoredNodes); + return FSUtils.create(fs, path, perms, favoredNodes, reducedReplicas); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index c0dd672..2662b8f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -299,7 +299,8 @@ public class HFile { "filesystem/path or path"); } if (path != null) { - ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes); + ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes, + fileContext.isReducedReplicas()); } return createWriter(fs, path, ostream, comparator, fileContext); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index e6201bf..6b8cc01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -103,7 +103,8 @@ public class HFileWriterV2 extends AbstractHFileWriter { FileSystem fs, Path path, FSDataOutputStream ostream, final KVComparator comparator, final HFileContext context) throws IOException { super(cacheConf, - ostream == null ? createOutputStream(conf, fs, path, null) : ostream, + ostream == null ? createOutputStream(conf, fs, path, null, context.isReducedReplicas()) : + ostream, path, comparator, context); finishInit(conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 1b24c2c..d4bd69b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -62,7 +62,7 @@ public class DefaultStoreFlusher extends StoreFlusher { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk writer = store.createWriterInTmp( - cellsCount, store.getFamily().getCompression(), false, true, true); + cellsCount, store.getFamily().getCompression(), false, true, true, true); writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); try { performFlush(scanner, writer, smallestReadPoint); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 8e62620..e1d2741 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -382,6 +382,9 @@ public class HRegionServer extends HasThread implements // chore for refreshing store files for secondary regions private StorefileRefresherChore storefileRefresher; + // chore for adjusting store file block replication as needed + private StorefileReplicationWatcher replicationWatcher; + private RegionServerCoprocessorHost rsHost; private RegionServerProcedureManagerHost rspmHost; @@ -712,6 +715,12 @@ public class HRegionServer extends HasThread implements if (storefileRefreshPeriod > 0) { this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this); } + int replicationWatcherPeriod = + conf.getInt(StorefileReplicationWatcher.REGIONSERVER_REPLICATION_WATCHER_PERIOD, + StorefileReplicationWatcher.DEFAULT_REGIONSERVER_REPLICATION_WATCHER_PERIOD); + if (replicationWatcherPeriod > 0) { + this.replicationWatcher = new StorefileReplicationWatcher(replicationWatcherPeriod, this, this); + } } /** @@ -845,6 +854,9 @@ public class HRegionServer extends HasThread implements if (this.storefileRefresher != null) { this.storefileRefresher.interrupt(); } + if (this.replicationWatcher != null) { + this.replicationWatcher.interrupt(); + } // Stop the snapshot and other procedure handlers, forcefully killing all running tasks if (rspmHost != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c783d14..b5e66a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -921,11 +921,13 @@ public class HStore implements Store { * @param isCompaction whether we are creating a new file in a compaction * @param includesMVCCReadPoint - whether to include MVCC or not * @param includesTag - includesTag or not + * @param reducedReplicas whether to ask for a reduced number of replicas or not * @return Writer for a new StoreFile in the tmp dir. */ @Override public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) + boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, + boolean reducedReplicas) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { @@ -942,6 +944,7 @@ public class HStore implements Store { } HFileContext hFileContext = createFileContext(compression, includeMVCCReadpoint, includesTag, cryptoContext); + hFileContext.setReducedReplicas(reducedReplicas); StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, this.getFileSystem()) .withFilePath(fs.createTempName()) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 280a1b8..7796f7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -160,6 +160,8 @@ public interface Store extends HeapSize, StoreConfigInformation { * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction * @param includeMVCCReadpoint whether we should out the MVCC readpoint + * @param includesTag true if the file will contain cells with tags + * @param reducedReplicas whether to ask for a reduced number of replicas or not * @return Writer for a new StoreFile in the tmp dir. */ StoreFile.Writer createWriterInTmp( @@ -167,7 +169,8 @@ public interface Store extends HeapSize, StoreConfigInformation { Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, - boolean includesTags + boolean includesTags, + boolean reducedReplicas ) throws IOException; // Compaction oriented methods diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileReplicationWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileReplicationWatcher.java new file mode 100644 index 0000000..3e2a5aa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileReplicationWatcher.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.util.StringUtils; + +/** + * A chore for insuring store files older than a configurable age have correct + * block replicatio settings. + */ +public class StorefileReplicationWatcher extends Chore { + + private static final Log LOG = LogFactory.getLog(StorefileReplicationWatcher.class); + + /** + * The period (in milliseconds) for monitoring store file replication + */ + public static final String REGIONSERVER_REPLICATION_WATCHER_PERIOD + = "hbase.regionserver.storefile.replication.watcher.period"; + static final int DEFAULT_REGIONSERVER_REPLICATION_WATCHER_PERIOD = 60 * 1000; // 1 minute + + /** + * The minimum age (in seconds) for store files before they monitored + */ + public static final String REGIONSERVER_REPLICATION_WATCHER_MIN_AGE + = "hbase.regionserver.storefile.replication.watcher.min.age"; + static final int DEFAULT_REGIONSERVER_REPLICATION_WATCHER_MIN_AGE = 5 * 60; // 5 minutes + + private HRegionServer regionServer; + private int minAge; + + public StorefileReplicationWatcher(int period, HRegionServer regionServer, Stoppable stoppable) { + super("StorefileReplicationWatcher", period, stoppable); + this.regionServer = regionServer; + this.minAge = regionServer.getConfiguration().getInt(REGIONSERVER_REPLICATION_WATCHER_MIN_AGE, + DEFAULT_REGIONSERVER_REPLICATION_WATCHER_MIN_AGE); + } + + @Override + protected void chore() { + long now = System.currentTimeMillis(); + for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { + try { + for (Store store : r.getStores().values()) { + FileSystem fs = store.getFileSystem(); + for (StoreFile file: store.getStorefiles()) { + FileStatus stat = fs.getFileStatus(file.getPath()); + if (stat.getModificationTime() < (now - (minAge * 1000))) { + short wanted = fs.getDefaultReplication(file.getPath()); + short have = stat.getReplication(); + if (have < wanted) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adjusting replication for " + file + ": have=" + have + + ", wanted=" + wanted); + } + fs.setReplication(file.getPath(), wanted); + } + } + } + } + } catch (IOException ex) { + LOG.warn("Exception in StorefileReplicationWatcher for region:" + r.getRegionInfo() + + ", exception:" + StringUtils.stringifyException(ex)); + continue; + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 768c691..6953bd6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -109,7 +109,7 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public Writer createWriter() throws IOException { StoreFile.Writer writer = store.createWriterInTmp( - kvCount, store.getFamily().getCompression(), false, true, true); + kvCount, store.getFamily().getCompression(), false, true, true, false); writer.setTimeRangeTracker(tracker); return writer; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index d5b2b63..1bcbe40 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -77,7 +77,7 @@ public class DefaultCompactor extends Compactor { cleanSeqId = true; } writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); + fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0, false); boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId); if (!finished) { writer.close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 487ff46..e273144 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -119,7 +119,7 @@ public class StripeCompactor extends Compactor { @Override public Writer createWriter() throws IOException { return store.createWriterInTmp( - fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0); + fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0, false); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index 0f21b72..21ebf65 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -283,9 +283,37 @@ public abstract class FSUtils { */ public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException { + return create(fs, path, perm, favoredNodes, false); + } + + /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. overwrite the file if it exists
  2. + *
  3. apply the umask in the configuration (if it is enabled)
  4. + *
  5. use the fs configured buffer size (or 4096 if not set)
  6. + *
  7. use the default block size
  8. + *
  9. not track progress
  10. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm permissions + * @param favoredNodes + * @param reducedReplicas + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + public static FSDataOutputStream create(FileSystem fs, Path path, + FsPermission perm, InetSocketAddress[] favoredNodes, boolean reducedReplicas) + throws IOException { if (fs instanceof HFileSystem) { FileSystem backingFs = ((HFileSystem)fs).getBackingFs(); if (backingFs instanceof DistributedFileSystem) { + short replicas = getDefaultReplication(backingFs, path); + if (reducedReplicas && replicas > 1) { + replicas--; + } // Try to use the favoredNodes version via reflection to allow backwards- // compatibility. try { @@ -295,7 +323,7 @@ public abstract class FSUtils { Progressable.class, InetSocketAddress[].class) .invoke(backingFs, path, perm, true, getDefaultBufferSize(backingFs), - getDefaultReplication(backingFs, path), + replicas, getDefaultBlockSize(backingFs, path), null, favoredNodes)); } catch (InvocationTargetException ite) { @@ -313,7 +341,7 @@ public abstract class FSUtils { } } } - return create(fs, path, perm, true); + return create(fs, path, perm, true, reducedReplicas); } /** @@ -335,11 +363,36 @@ public abstract class FSUtils { */ public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm, boolean overwrite) throws IOException { + return create(fs, path, perm, overwrite, false); + } + + /** + * Create the specified file on the filesystem. By default, this will: + *
    + *
  1. apply the umask in the configuration (if it is enabled)
  2. + *
  3. use the fs configured buffer size (or 4096 if not set)
  4. + *
  5. use the default block size
  6. + *
  7. not track progress
  8. + *
+ * + * @param fs {@link FileSystem} on which to write the file + * @param path {@link Path} to the file to write + * @param perm + * @param overwrite Whether or not the created file should be overwritten. + * @return output stream to the created file + * @throws IOException if the file cannot be created + */ + public static FSDataOutputStream create(FileSystem fs, Path path, + FsPermission perm, boolean overwrite, boolean reducedReplicas) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("Creating file=" + path + " with permission=" + perm + ", overwrite=" + overwrite); } - return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), - getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null); + short replicas = getDefaultReplication(fs, path); + if (replicas > 1 && reducedReplicas) { + replicas--; + } + return fs.create(path, perm, overwrite, getDefaultBufferSize(fs), replicas, + getDefaultBlockSize(fs, path), null); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java index 8699954..575fc3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCacheOnWriteInSchema.java @@ -206,7 +206,7 @@ public class TestCacheOnWriteInSchema { public void testCacheOnWriteInSchema() throws IOException { // Write some random data into the store StoreFile.Writer writer = store.createWriterInTmp(Integer.MAX_VALUE, - HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false); + HFile.DEFAULT_COMPRESSION_ALGORITHM, false, true, false, false); writeStoreFile(writer); writer.close(); // Verify the block types of interest were cached on write diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 9600932..3eff9ac 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -258,7 +258,8 @@ public class TestStore { init(name.getMethodName(), conf, hcd); // Test createWriterInTmp() - StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false); + StoreFile.Writer writer = store.createWriterInTmp(4, hcd.getCompression(), false, true, false, + false); Path path = writer.getPath(); writer.append(new KeyValue(row, family, qf1, Bytes.toBytes(1))); writer.append(new KeyValue(row, family, qf2, Bytes.toBytes(2))); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 43b8254..a64771e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -187,7 +187,7 @@ public class TestStripeCompactor { when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), - anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); when(store.getComparator()).thenReturn(new KVComparator()); return new StripeCompactor(conf, store) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 540b6d9..179b97a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -742,7 +742,7 @@ public class TestStripeCompactionPolicy { when(store.getFamily()).thenReturn(col); when( store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), - anyBoolean(), anyBoolean())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); Configuration conf = HBaseConfiguration.create(); final Scanner scanner = new Scanner(); -- 1.8.5.2 (Apple Git-48)