From 6b8bdd2530676844756295a2b20b583d401246e5 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 5 Feb 2013 11:12:56 -0800 Subject: [PATCH] Compactions not sorting based on size anymore. --- .../hadoop/hbase/regionserver/StoreFile.java | 6 +- .../DefaultCompactionPolicyRatioRight.java | 393 +++++++++++++++++++ .../compactions/DefaultCompactionPolicySort.java | 394 +++++++++++++++++++ .../DefaultCompactionPolicySortRatioRight.java | 397 ++++++++++++++++++++ .../DefaultCompactionPolicySortSmall.java | 393 +++++++++++++++++++ ...DefaultCompactionPolicySortSmallRatioRight.java | 396 +++++++++++++++++++ .../regionserver/TestDefaultCompactSelection.java | 6 +- .../compactions/TestDefaultCompactionPolicy.java | 139 +++++++ .../compactions/TestPerfCompactionPolicy.java | 305 +++++++++++++++ 9 files changed, 2423 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyRatioRight.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySort.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortRatioRight.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmall.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmallRatioRight.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 3eb6d25..871d88d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1761,7 +1761,7 @@ public class StoreFile { /** * Useful comparators for comparing StoreFiles. */ - abstract static class Comparators { + public abstract static class Comparators { /** * Comparator that compares based on the Sequence Ids of the * the StoreFiles. Bulk loads that did not request a seq ID @@ -1770,7 +1770,7 @@ public class StoreFile { * the bulkLoadTime is used to determine the ordering. * If there are ties, the path name is used as a tie-breaker. */ - static final Comparator SEQ_ID = + public static final Comparator SEQ_ID = Ordering.compound(ImmutableList.of( Ordering.natural().onResultOf(new GetSeqId()), Ordering.natural().onResultOf(new GetBulkTime()), @@ -1802,7 +1802,7 @@ public class StoreFile { /** * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size) */ - static final Comparator FILE_SIZE = Ordering.natural().reverse() + public static final Comparator FILE_SIZE = Ordering.natural().reverse() .onResultOf(new Function() { @Override public Long apply(StoreFile sf) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyRatioRight.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyRatioRight.java new file mode 100644 index 0000000..90939e0 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyRatioRight.java @@ -0,0 +1,393 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicyRatioRight extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicyRatioRight.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicyRatioRight() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + // Preliminary compaction subject to filters + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(comConf.getMaxFilesToCompact(), + candidates.getFilesToCompact().size()); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + start = Math.max(0, countOfFiles - comConf.getMinFilesToCompact()); + while (start > 0 && + ( fileSizes[start -1] < Math.max(comConf.getMinCompactSize(), (long) (sumSize[start] * ratio)))) { + + --start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySort.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySort.java new file mode 100644 index 0000000..fe52028 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySort.java @@ -0,0 +1,394 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicySort extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicySort.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicySort() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + // Preliminary compaction subject to filters + Collections.sort(candidateFiles, StoreFile.Comparators.FILE_SIZE); + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(comConf.getMaxFilesToCompact(), + candidates.getFilesToCompact().size()); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + while (countOfFiles - start >= comConf.getMinFilesToCompact() && + fileSizes[start] > Math.max(comConf.getMinCompactSize(), + (long) (sumSize[start + 1] * ratio))) { + ++start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortRatioRight.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortRatioRight.java new file mode 100644 index 0000000..2207d88 --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortRatioRight.java @@ -0,0 +1,397 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicySortRatioRight extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicySortRatioRight.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicySortRatioRight() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + + Collections.sort(candidateFiles, StoreFile.Comparators.FILE_SIZE); + + // Preliminary compaction subject to filters + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(comConf.getMaxFilesToCompact(), + candidates.getFilesToCompact().size()); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + start = Math.max(0, countOfFiles - comConf.getMinFilesToCompact()); + while (start > 0 && + ( fileSizes[start -1] < Math.max(comConf.getMinCompactSize(), (long) (sumSize[start] * ratio)))) { + + --start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmall.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmall.java new file mode 100644 index 0000000..d0f573c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmall.java @@ -0,0 +1,393 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicySortSmall extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicySortSmall.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicySortSmall() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + // Preliminary compaction subject to filters + Collections.sort(candidateFiles, StoreFile.Comparators.FILE_SIZE); + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(0,excess); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + while (countOfFiles - start >= comConf.getMinFilesToCompact() && + fileSizes[start] > Math.max(comConf.getMinCompactSize(), + (long) (sumSize[start + 1] * ratio))) { + ++start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmallRatioRight.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmallRatioRight.java new file mode 100644 index 0000000..0c48adb --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicySortSmallRatioRight.java @@ -0,0 +1,396 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicySortSmallRatioRight extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicySortSmallRatioRight.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicySortSmallRatioRight() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + + Collections.sort(candidateFiles, StoreFile.Comparators.FILE_SIZE); + + // Preliminary compaction subject to filters + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(0,excess); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + start = Math.max(0, countOfFiles - comConf.getMinFilesToCompact()); + while (start > 0 && + ( fileSizes[start -1] < Math.max(comConf.getMinCompactSize(), (long) (sumSize[start] * ratio)))) { + + --start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index 5d3358d..0502b2c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -259,7 +259,7 @@ public class TestDefaultCompactSelection extends TestCase { */ // don't exceed max file compact threshold // note: file selection starts with largest to smallest. - compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); /* MAJOR COMPACTION */ // if a major compaction has been forced, then compact everything @@ -270,7 +270,7 @@ public class TestDefaultCompactSelection extends TestCase { compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12); // don't exceed max file compact threshold, even with major compaction store.forceMajor = true; - compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); store.forceMajor = false; // if we exceed maxCompactSize, downgrade to minor // if not, it creates a 'snowball effect' when files >> maxCompactSize: @@ -295,7 +295,7 @@ public class TestDefaultCompactSelection extends TestCase { // reference files shouldn't obey max threshold compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12); // reference files should obey max file compact to avoid OOM - compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); // empty case compactEquals(new ArrayList() /* empty */); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java new file mode 100644 index 0000000..bd0c5e7 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestDefaultCompactionPolicy { + + Configuration configuration; + DefaultCompactionPolicy dcp; + List candidates; + + + @Before + public void setUp() { + + configuration = HBaseConfiguration.create(); + + //Make sure that this doesn't include every file. + configuration.setInt("hbase.hstore.compaction.max", 4); + //But it must include three files at least. + configuration.setInt("hbase.hstore.compaction.min", 3); + + dcp = new DefaultCompactionPolicy(); + //Assign the store first so that updateConfiguration will create a compactionconf. + dcp.store = createMockStore(); + + //Now set the conf. + dcp.setConf(configuration); + + candidates = new ArrayList(5); + } + + @Test + public void testSelectCompactionMaxFiles() throws Exception { + for(int i = 100; i < 350; i++) { + candidates.add(createMockStoreFile(i)); + } + + CompactSelection sel = dcp.selectCompaction(candidates, false, false); + + //4 is the max files so assume the rest got dropped. + assertEquals(4, sel.getFilesToCompact().size()); + } + + @Test + public void testSelectCompactionMinFiles() throws Exception { + candidates.add(createMockStoreFile(101)); + candidates.add(createMockStoreFile(102)); + CompactSelection sel = dcp.selectCompaction(candidates, false, false); + assertEquals(0, sel.getFilesToCompact().size()); // 0 because 2 < min num files. + + } + + @Test + public void testSelectionCompactionRation() throws Exception { + + // Add four files. Which is equal to the max. However one should be excluded due to + // being too large based on the ratio. + StoreFile large = createMockStoreFile((int)(310 * dcp.comConf.getCompactionRatio())); + candidates.add(large); + candidates.add(createMockStoreFile(100)); + candidates.add(createMockStoreFile(100)); + candidates.add(createMockStoreFile(100)); + + CompactSelection sel = dcp.selectCompaction(candidates, false, false); + assertFalse(sel.getFilesToCompact().contains(large)); + } + +// @Test +// public void testSelectCompactionForOutOfOrderSize() throws Exception { +// +// //Order is important here. We want to make sure that sorting takes place. +// candidates.add(createMockStoreFile(256)); +// candidates.add(createMockStoreFile(256)); +// candidates.add(createMockStoreFile(256)); +// StoreFile large = createMockStoreFile(746); +// candidates.add(large); +// candidates.add(createMockStoreFile(256)); +// +// CompactSelection sel = dcp.selectCompaction(candidates, false, false); +// assertFalse(sel.getFilesToCompact().contains(large)); +// } + + private StoreFile createMockStoreFile(int sizeMb) { + StoreFile mockSf = mock(StoreFile.class); + StoreFile.Reader reader = mock(StoreFile.Reader.class); + + when(reader.getSequenceID()).thenReturn(-1l); + when(reader.getTotalUncompressedBytes()).thenReturn(1024l * 1024l * sizeMb); + when(reader.length()).thenReturn(1024l * 1024l * sizeMb); + + when(mockSf.excludeFromMinorCompaction()).thenReturn(false); + when(mockSf.getReader()).thenReturn(reader); + when(mockSf.toString()).thenReturn("MockStoreFile File Size: " + sizeMb); + + return mockSf; + } + + private HStore createMockStore() { + HStore s = mock(HStore.class); + when(s.getStoreFileTtl()).thenReturn(Long.MAX_VALUE); + + return s; + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java new file mode 100644 index 0000000..485ae7d --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(SmallTests.class) +@RunWith(Parameterized.class) +public class TestPerfCompactionPolicy { + + static final Log LOG = LogFactory.getLog(TestPerfCompactionPolicy.class); + + private final String name; + private final CompactionPolicy cp; + private final int max; + private final int min; + private final float ratio; + private long written = 0; + private long iterations = 0; + private long startingFiles = 0; + private long endingFiles = 0; + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {"Default", new DefaultCompactionPolicy(), 3, 2, 1.2f}, + {"Default", new DefaultCompactionPolicy(), 4, 2, 1.2f}, + {"Default", new DefaultCompactionPolicy(), 5, 2, 1.2f}, + {"Default", new DefaultCompactionPolicy(), 6, 2, 1.2f}, + {"Default", new DefaultCompactionPolicy(), 7, 2, 1.2f}, + {"Default", new DefaultCompactionPolicy(), 4, 2, 1.3f}, + {"Default", new DefaultCompactionPolicy(), 4, 2, 1.4f}, + {"Default", new DefaultCompactionPolicy(), 4, 2, 1.5f}, + + + + {"Sort", new DefaultCompactionPolicySort(), 3, 2, 1.2f}, + {"Sort", new DefaultCompactionPolicySort(), 4, 2, 1.2f}, + {"Sort", new DefaultCompactionPolicySort(), 5, 2, 1.2f}, + {"Sort", new DefaultCompactionPolicySort(), 6, 2, 1.2f}, + {"Sort", new DefaultCompactionPolicySort(), 7, 2, 1.2f}, + {"Sort", new DefaultCompactionPolicySort(), 4, 2, 1.3f}, + {"Sort", new DefaultCompactionPolicySort(), 4, 2, 1.4f}, + {"Sort", new DefaultCompactionPolicySort(), 4, 2, 1.5f}, + + + + {"Ratio", new DefaultCompactionPolicyRatioRight(), 3, 2, 1.2f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 4, 2, 1.2f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 5, 2, 1.2f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 6, 2, 1.2f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 7, 2, 1.2f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 4, 2, 1.3f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 4, 2, 1.4f}, + {"Ratio", new DefaultCompactionPolicyRatioRight(), 4, 2, 1.5f}, + + + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 3, 2, 1.2f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 4, 2, 1.2f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 5, 2, 1.2f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 6, 2, 1.2f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 7, 2, 1.2f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 4, 2, 1.3f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 4, 2, 1.4f}, + {"SortRatio", new DefaultCompactionPolicySortRatioRight(), 4, 2, 1.5f}, + + + {"SortSmall", new DefaultCompactionPolicySortSmall(), 3, 2, 1.2f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 4, 2, 1.2f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 5, 2, 1.2f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 6, 2, 1.2f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 7, 2, 1.2f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 4, 2, 1.3f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 4, 2, 1.4f}, + {"SortSmall", new DefaultCompactionPolicySortSmall(), 4, 2, 1.5f}, + + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 3, 2, 1.2f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 4, 2, 1.2f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 5, 2, 1.2f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 6, 2, 1.2f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 7, 2, 1.2f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 4, 2, 1.3f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 4, 2, 1.4f}, + {"SortSmallRatio", new DefaultCompactionPolicySortSmallRatioRight(), 4, 2, 1.5f}, + }); + } + + public TestPerfCompactionPolicy(String name, CompactionPolicy cp, int max, int min, float ratio) { + this.max = max; + this.min = min; + this.ratio = ratio; + + org.apache.log4j.Logger.getLogger(CompactionConfiguration.class). + setLevel(org.apache.log4j.Level.ERROR); + + org.apache.log4j.Logger.getLogger(cp.getClass()). + setLevel(org.apache.log4j.Level.ERROR); + + this.name = name; + this.cp = cp; + + + Configuration configuration = HBaseConfiguration.create(); + + //Make sure that this doesn't include every file. + configuration.setInt("hbase.hstore.compaction.max", max); + configuration.setInt("hbase.hstore.compaction.min", min); + configuration.setFloat("hbase.hstore.compaction.ratio", ratio); + + cp.store = createMockStore(); + + //Now set the conf. + cp.setConf(configuration); + } + + @Test + public void testCompactions() throws Exception { + + + + //Some special cases. + int[][] fileSizes = new int[][] { + {1000,350,200,100,20,10,10}, + {1000,450,200,100,20,10,10}, + {1000,550,200,100,20,10,10}, + {1000,650,200,100,20,10,10}, + {1000,250,25,25,25,25,25,25}, + {25,25,25,25,25,25, 500}, + {1000,1000,1000,1000,900}, + {107, 50, 10, 10, 10, 10}, + {2000, 107, 50, 10, 10, 10, 10}, + {9,8,7,6,5,4,3,2,1}, + {11,18,9,8,7,6,5,4,3,2,1}, + {110,18,18,18,18,9,8,7,6,5,4,3,2,1} + }; + + for(int[] fs:fileSizes) { + List storeFiles = createStoreFileList(fs); + startingFiles += storeFiles.size(); + + List sTwo = runIteration(storeFiles); + List sThree = runIteration(sTwo); + endingFiles += sThree.size(); + } + + long[] seeds = new long[] { + -71810, 43328, 45852, -97821, 72864, + -27902, -9832, 92684, 4900, 25394, + -30447, 99580, -25731, -22047, 81157, + 66516, -99093, -48139, -82778, 60701, + 41455, 82852, -91176, -97101, 37952, + 61178, 19021, -64174, 10997, -66468, + -75501, -41283, -79849, -30542, 89635, + -90500, 32912, -11541, 44923, -71162, + -56932, -98994, -78222, 85530, -15455, + -79379, 42617, 19568, -72522, 57925, + 7131, -90911, 45214, 60855, 58307, + -60695, -12032, 12846, 42639, 84767, + -88972, -89194, -22965, -29059, -82397, + 72363, -53877, 8648, -59686, 94827, + -7513, -97734, 72303, -21442, -84681, + -34609, -8956, 3613, 69288, -16987, + 81842, -63248, 9332, -90234, 51453, + -3835, 36257, 92924, -34882, -94294, + 2611, -99898, 6331, 7079, 53965, + -87720, -16930, 95989, 2361, 83507, + -93274, 31224, 16772, -31809, 17517}; + + for (long seed:seeds) { + Random random = new Random(seed); + List storeFiles = new LinkedList(); + + + storeFiles.add(createMockStoreFile(random.nextInt(1700) + 500)); + storeFiles.add(createMockStoreFile(random.nextInt(700) + 400)); + storeFiles.add(createMockStoreFile(random.nextInt(400) + 300)); + storeFiles.add(createMockStoreFile(random.nextInt(400) + 200)); + + + for (int i =0; i< 20; i++) { + storeFiles.add(createMockStoreFile(random.nextInt(30) + 30)); + storeFiles.add(createMockStoreFile(random.nextInt(30) + 30)); + storeFiles.add(createMockStoreFile(random.nextInt(30) + 30)); + startingFiles += storeFiles.size(); + storeFiles = runIteration(storeFiles); + endingFiles += storeFiles.size(); + } + } + + + System.out.println( + this.name + + "\t" + max + + "\t"+min + + "\t"+ratio + + "\t"+ written + + "\t" + (endingFiles - startingFiles)); + } + + + private List runIteration(List storeFiles) throws IOException { + List startingStoreFiles = storeFiles; + storeFiles = new ArrayList(storeFiles); + iterations += 1; + CompactSelection sel = cp.selectCompaction(new ArrayList(storeFiles), false, false); + int newFileSize = 0; + + List filesToCompact = sel.getFilesToCompact(); + + if (!filesToCompact.isEmpty()) { + + + storeFiles = new ArrayList(storeFiles); + storeFiles.removeAll(filesToCompact); + + for (StoreFile storeFile : filesToCompact) { + newFileSize += storeFile.getReader().length(); + } + storeFiles.add(createMockStoreFile(newFileSize)); + } +// LOG.debug("Iter: started: " + sfToString(startingStoreFiles) +// +" wrote " + writtenInRun + "mb result " +// + sfToString(storeFiles)); + written += newFileSize; + return storeFiles; + } + + private String sfToString(List storeFiles) { + String res = "["; + + for(StoreFile s:storeFiles) { + res += s.getReader().length() + ","; + } + return res+ "]"; + } + + private List createStoreFileList(int[] fs) { + List storeFiles = new LinkedList(); + for(int fileSize:fs) { + storeFiles.add(createMockStoreFile(fileSize)); + } + return storeFiles; + } + + + private StoreFile createMockStoreFile(int sizeMb) { + StoreFile mockSf = mock(StoreFile.class); + StoreFile.Reader reader = mock(StoreFile.Reader.class); + + when(reader.getSequenceID()).thenReturn(-1l); + when(reader.getTotalUncompressedBytes()).thenReturn(Long.valueOf(sizeMb)); + when(reader.length()).thenReturn(Long.valueOf(sizeMb)); + + when(mockSf.excludeFromMinorCompaction()).thenReturn(false); + when(mockSf.getReader()).thenReturn(reader); + when(mockSf.toString()).thenReturn("MockStoreFile File Size: " + sizeMb); + + return mockSf; + } + + private HStore createMockStore() { + HStore s = mock(HStore.class); + when(s.getStoreFileTtl()).thenReturn(Long.MAX_VALUE); + + return s; + } + +} -- 1.7.10.2 (Apple Git-33)