From a154fa72b6141aff858e3fbfc6463ab465a0caa4 Mon Sep 17 00:00:00 2001 From: Elliott Clark Date: Tue, 5 Feb 2013 11:12:56 -0800 Subject: [PATCH] Compactions not sorting based on size anymore. --- .../hadoop/hbase/regionserver/StoreFile.java | 6 +- .../DefaultCompactionPolicyWithSort.java | 394 ++++++++++++++++++++ .../DefaultCompactionPolicyWithSortTakeSmall.java | 393 +++++++++++++++++++ .../regionserver/TestDefaultCompactSelection.java | 6 +- .../compactions/TestDefaultCompactionPolicy.java | 139 +++++++ .../compactions/TestPerfCompactionPolicy.java | 186 +++++++++ 6 files changed, 1118 insertions(+), 6 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSort.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSortTakeSmall.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 3eb6d25..871d88d 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -1761,7 +1761,7 @@ public class StoreFile { /** * Useful comparators for comparing StoreFiles. */ - abstract static class Comparators { + public abstract static class Comparators { /** * Comparator that compares based on the Sequence Ids of the * the StoreFiles. Bulk loads that did not request a seq ID @@ -1770,7 +1770,7 @@ public class StoreFile { * the bulkLoadTime is used to determine the ordering. * If there are ties, the path name is used as a tie-breaker. */ - static final Comparator SEQ_ID = + public static final Comparator SEQ_ID = Ordering.compound(ImmutableList.of( Ordering.natural().onResultOf(new GetSeqId()), Ordering.natural().onResultOf(new GetBulkTime()), @@ -1802,7 +1802,7 @@ public class StoreFile { /** * FILE_SIZE = descending sort StoreFiles (largest --> smallest in size) */ - static final Comparator FILE_SIZE = Ordering.natural().reverse() + public static final Comparator FILE_SIZE = Ordering.natural().reverse() .onResultOf(new Function() { @Override public Long apply(StoreFile sf) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSort.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSort.java new file mode 100644 index 0000000..b3ea89d --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSort.java @@ -0,0 +1,394 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicyWithSort extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicyWithSort.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicyWithSort() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + // Preliminary compaction subject to filters + Collections.sort(candidateFiles, StoreFile.Comparators.FILE_SIZE); + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(comConf.getMaxFilesToCompact(), + candidates.getFilesToCompact().size()); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + while (countOfFiles - start >= comConf.getMinFilesToCompact() && + fileSizes[start] > Math.max(comConf.getMinCompactSize(), + (long) (sumSize[start + 1] * ratio))) { + ++start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSortTakeSmall.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSortTakeSmall.java new file mode 100644 index 0000000..236c41c --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicyWithSortTakeSmall.java @@ -0,0 +1,393 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StoreUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.GregorianCalendar; +import java.util.List; +import java.util.Random; + +/** + * The default algorithm for selecting files for compaction. + * Combines the compaction configuration and the provisional file selection that + * it's given to produce the list of suitable candidates for compaction. + */ +@InterfaceAudience.Private +public class DefaultCompactionPolicyWithSortTakeSmall extends CompactionPolicy { + + private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicyWithSortTakeSmall.class); + private final static Calendar calendar = new GregorianCalendar(); + + public DefaultCompactionPolicyWithSortTakeSmall() { + compactor = new DefaultCompactor(this); + } + + /** + * @param candidateFiles candidate files, ordered from oldest to newest + * @return subset copy of candidate list that meets compaction criteria + * @throws java.io.IOException + */ + public CompactSelection selectCompaction(List candidateFiles, + boolean isUserCompaction, boolean forceMajor) + throws IOException { + // Preliminary compaction subject to filters + Collections.sort(candidateFiles, StoreFile.Comparators.FILE_SIZE); + CompactSelection candidateSelection = new CompactSelection(candidateFiles); + long cfTtl = this.store.getStoreFileTtl(); + if (!forceMajor) { + // If there are expired files, only select them so that compaction deletes them + if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) { + CompactSelection expiredSelection = selectExpiredStoreFiles( + candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl); + if (expiredSelection != null) { + return expiredSelection; + } + } + candidateSelection = skipLargeFiles(candidateSelection); + } + + // Force a major compaction if this is a user-requested major compaction, + // or if we do not have too many files to compact and this was requested + // as a major compaction. + // Or, if there are any references among the candidates. + boolean majorCompaction = ( + (forceMajor && isUserCompaction) + || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact())) + && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact())) + || StoreUtils.hasReferences(candidateSelection.getFilesToCompact()) + ); + + if (!majorCompaction) { + // we're doing a minor compaction, let's see what files are applicable + candidateSelection = filterBulk(candidateSelection); + candidateSelection = applyCompactionPolicy(candidateSelection); + candidateSelection = checkMinFilesCriteria(candidateSelection); + } + candidateSelection = + removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction); + return candidateSelection; + } + + /** + * Select the expired store files to compact + * + * @param candidates the initial set of storeFiles + * @param maxExpiredTimeStamp + * The store file will be marked as expired if its max time stamp is + * less than this maxExpiredTimeStamp. + * @return A CompactSelection contains the expired store files as + * filesToCompact + */ + private CompactSelection selectExpiredStoreFiles( + CompactSelection candidates, long maxExpiredTimeStamp) { + List filesToCompact = candidates.getFilesToCompact(); + if (filesToCompact == null || filesToCompact.size() == 0) + return null; + ArrayList expiredStoreFiles = null; + boolean hasExpiredStoreFiles = false; + CompactSelection expiredSFSelection = null; + + for (StoreFile storeFile : filesToCompact) { + if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) { + LOG.info("Deleting the expired store file by compaction: " + + storeFile.getPath() + " whose maxTimeStamp is " + + storeFile.getReader().getMaxTimestamp() + + " while the max expired timestamp is " + maxExpiredTimeStamp); + if (!hasExpiredStoreFiles) { + expiredStoreFiles = new ArrayList(); + hasExpiredStoreFiles = true; + } + expiredStoreFiles.add(storeFile); + } + } + + if (hasExpiredStoreFiles) { + expiredSFSelection = new CompactSelection(expiredStoreFiles); + } + return expiredSFSelection; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all files above maxCompactSize + * Also save all references. We MUST compact them + */ + private CompactSelection skipLargeFiles(CompactSelection candidates) { + int pos = 0; + while (pos < candidates.getFilesToCompact().size() && + candidates.getFilesToCompact().get(pos).getReader().length() > + comConf.getMaxCompactSize() && + !candidates.getFilesToCompact().get(pos).isReference()) { + ++pos; + } + if (pos > 0) { + LOG.debug("Some files are too large. Excluding " + pos + + " files from compaction candidates"); + candidates.clearSubList(0, pos); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * exclude all bulk load files if configured + */ + private CompactSelection filterBulk(CompactSelection candidates) { + candidates.getFilesToCompact().removeAll(Collections2.filter( + candidates.getFilesToCompact(), + new Predicate() { + @Override + public boolean apply(StoreFile input) { + return input.excludeFromMinorCompaction(); + } + })); + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * take upto maxFilesToCompact from the start + */ + private CompactSelection removeExcessFiles(CompactSelection candidates, + boolean isUserCompaction, boolean isMajorCompaction) { + int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact(); + if (excess > 0) { + if (isMajorCompaction && isUserCompaction) { + LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() + + " files because of a user-requested major compaction"); + } else { + LOG.debug("Too many admissible files. Excluding " + excess + + " files from compaction candidates"); + candidates.clearSubList(0,excess); + } + } + return candidates; + } + /** + * @param candidates pre-filtrate + * @return filtered subset + * forget the compactionSelection if we don't have enough files + */ + private CompactSelection checkMinFilesCriteria(CompactSelection candidates) { + int minFiles = comConf.getMinFilesToCompact(); + if (candidates.getFilesToCompact().size() < minFiles) { + if(LOG.isDebugEnabled()) { + LOG.debug("Not compacting files because we only have " + + candidates.getFilesToCompact().size() + + " files ready for compaction. Need " + minFiles + " to initiate."); + } + candidates.emptyFileList(); + } + return candidates; + } + + /** + * @param candidates pre-filtrate + * @return filtered subset + * -- Default minor compaction selection algorithm: + * choose CompactSelection from candidates -- + * First exclude bulk-load files if indicated in configuration. + * Start at the oldest file and stop when you find the first file that + * meets compaction criteria: + * (1) a recently-flushed, small file (i.e. <= minCompactSize) + * OR + * (2) within the compactRatio of sum(newer_files) + * Given normal skew, any newer files will also meet this criteria + *

+ * Additional Note: + * If fileSizes.size() >> maxFilesToCompact, we will recurse on + * compact(). Consider the oldest files first to avoid a + * situation where we always compact [end-threshold,end). Then, the + * last file becomes an aggregate of the previous compactions. + * + * normal skew: + * + * older ----> newer (increasing seqID) + * _ + * | | _ + * | | | | _ + * --|-|- |-|- |-|---_-------_------- minCompactSize + * | | | | | | | | _ | | + * | | | | | | | | | | | | + * | | | | | | | | | | | | + */ + CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException { + if (candidates.getFilesToCompact().isEmpty()) { + return candidates; + } + + // we're doing a minor compaction, let's see what files are applicable + int start = 0; + double ratio = comConf.getCompactionRatio(); + if (isOffPeakHour() && candidates.trySetOffpeak()) { + ratio = comConf.getCompactionRatioOffPeak(); + LOG.info("Running an off-peak compaction, selection ratio = " + ratio + + ", numOutstandingOffPeakCompactions is now " + + CompactSelection.getNumOutStandingOffPeakCompactions()); + } + + // get store file sizes for incremental compacting selection. + int countOfFiles = candidates.getFilesToCompact().size(); + long[] fileSizes = new long[countOfFiles]; + long[] sumSize = new long[countOfFiles]; + for (int i = countOfFiles - 1; i >= 0; --i) { + StoreFile file = candidates.getFilesToCompact().get(i); + fileSizes[i] = file.getReader().length(); + // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo + int tooFar = i + comConf.getMaxFilesToCompact() - 1; + sumSize[i] = fileSizes[i] + + ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0) + - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); + } + + + while (countOfFiles - start >= comConf.getMinFilesToCompact() && + fileSizes[start] > Math.max(comConf.getMinCompactSize(), + (long) (sumSize[start + 1] * ratio))) { + ++start; + } + if (start < countOfFiles) { + LOG.info("Default compaction algorithm has selected " + (countOfFiles - start) + + " files from " + countOfFiles + " candidates"); + } + + candidates = candidates.getSubList(start, countOfFiles); + + return candidates; + } + + /* + * @param filesToCompact Files to compact. Can be null. + * @return True if we should run a major compaction. + */ + public boolean isMajorCompaction(final List filesToCompact) + throws IOException { + boolean result = false; + long mcTime = getNextMajorCompactTime(filesToCompact); + if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) { + return result; + } + // TODO: Use better method for determining stamp of last major (HBASE-2990) + long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact); + long now = System.currentTimeMillis(); + if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) { + // Major compaction time has elapsed. + long cfTtl = this.store.getStoreFileTtl(); + if (filesToCompact.size() == 1) { + // Single file + StoreFile sf = filesToCompact.get(0); + Long minTimestamp = sf.getMinimumTimestamp(); + long oldest = (minTimestamp == null) + ? Long.MIN_VALUE + : now - minTimestamp.longValue(); + if (sf.isMajorCompaction() && + (cfTtl == HConstants.FOREVER || oldest < cfTtl)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping major compaction of " + this + + " because one (major) compacted file only and oldestTime " + + oldest + "ms is < ttl=" + cfTtl); + } + } else if (cfTtl != HConstants.FOREVER && oldest > cfTtl) { + LOG.debug("Major compaction triggered on store " + this + + ", because keyvalues outdated; time since last major compaction " + + (now - lowTimestamp) + "ms"); + result = true; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Major compaction triggered on store " + this + + "; time since last major compaction " + (now - lowTimestamp) + "ms"); + } + result = true; + } + } + return result; + } + + public long getNextMajorCompactTime(final List filesToCompact) { + // default = 24hrs + long ret = comConf.getMajorCompactionPeriod(); + if (ret > 0) { + // default = 20% = +/- 4.8 hrs + double jitterPct = comConf.getMajorCompactionJitter(); + if (jitterPct > 0) { + long jitter = Math.round(ret * jitterPct); + // deterministic jitter avoids a major compaction storm on restart + Integer seed = StoreUtils.getDeterministicRandomSeed(filesToCompact); + if (seed != null) { + double rnd = (new Random(seed)).nextDouble(); + ret += jitter - Math.round(2L * jitter * rnd); + } else { + ret = 0; // no storefiles == no major compaction + } + } + } + return ret; + } + + /** + * @param compactionSize Total size of some compaction + * @return whether this should be a large or small compaction + */ + public boolean throttleCompaction(long compactionSize) { + return compactionSize > comConf.getThrottlePoint(); + } + + /** + * @param numCandidates Number of candidate store files + * @return whether a compactionSelection is possible + */ + public boolean needsCompaction(int numCandidates) { + return numCandidates > comConf.getMinFilesToCompact(); + } + + /** + * @return whether this is off-peak hour + */ + private boolean isOffPeakHour() { + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + int startHour = comConf.getOffPeakStartHour(); + int endHour = comConf.getOffPeakEndHour(); + // If offpeak time checking is disabled just return false. + if (startHour == endHour) { + return false; + } + if (startHour < endHour) { + return (currentHour >= startHour && currentHour < endHour); + } + return (currentHour >= startHour || currentHour < endHour); + } +} \ No newline at end of file diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index 5d3358d..0502b2c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -259,7 +259,7 @@ public class TestDefaultCompactSelection extends TestCase { */ // don't exceed max file compact threshold // note: file selection starts with largest to smallest. - compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); /* MAJOR COMPACTION */ // if a major compaction has been forced, then compact everything @@ -270,7 +270,7 @@ public class TestDefaultCompactSelection extends TestCase { compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12); // don't exceed max file compact threshold, even with major compaction store.forceMajor = true; - compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + compactEquals(sfCreate(7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); store.forceMajor = false; // if we exceed maxCompactSize, downgrade to minor // if not, it creates a 'snowball effect' when files >> maxCompactSize: @@ -295,7 +295,7 @@ public class TestDefaultCompactSelection extends TestCase { // reference files shouldn't obey max threshold compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12); // reference files should obey max file compact to avoid OOM - compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 7, 6, 5, 4, 3); + compactEquals(sfCreate(true, 7, 6, 5, 4, 3, 2, 1), 5, 4, 3, 2, 1); // empty case compactEquals(new ArrayList() /* empty */); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java new file mode 100644 index 0000000..bd0c5e7 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDefaultCompactionPolicy.java @@ -0,0 +1,139 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.*; + +@Category(SmallTests.class) +public class TestDefaultCompactionPolicy { + + Configuration configuration; + DefaultCompactionPolicy dcp; + List candidates; + + + @Before + public void setUp() { + + configuration = HBaseConfiguration.create(); + + //Make sure that this doesn't include every file. + configuration.setInt("hbase.hstore.compaction.max", 4); + //But it must include three files at least. + configuration.setInt("hbase.hstore.compaction.min", 3); + + dcp = new DefaultCompactionPolicy(); + //Assign the store first so that updateConfiguration will create a compactionconf. + dcp.store = createMockStore(); + + //Now set the conf. + dcp.setConf(configuration); + + candidates = new ArrayList(5); + } + + @Test + public void testSelectCompactionMaxFiles() throws Exception { + for(int i = 100; i < 350; i++) { + candidates.add(createMockStoreFile(i)); + } + + CompactSelection sel = dcp.selectCompaction(candidates, false, false); + + //4 is the max files so assume the rest got dropped. + assertEquals(4, sel.getFilesToCompact().size()); + } + + @Test + public void testSelectCompactionMinFiles() throws Exception { + candidates.add(createMockStoreFile(101)); + candidates.add(createMockStoreFile(102)); + CompactSelection sel = dcp.selectCompaction(candidates, false, false); + assertEquals(0, sel.getFilesToCompact().size()); // 0 because 2 < min num files. + + } + + @Test + public void testSelectionCompactionRation() throws Exception { + + // Add four files. Which is equal to the max. However one should be excluded due to + // being too large based on the ratio. + StoreFile large = createMockStoreFile((int)(310 * dcp.comConf.getCompactionRatio())); + candidates.add(large); + candidates.add(createMockStoreFile(100)); + candidates.add(createMockStoreFile(100)); + candidates.add(createMockStoreFile(100)); + + CompactSelection sel = dcp.selectCompaction(candidates, false, false); + assertFalse(sel.getFilesToCompact().contains(large)); + } + +// @Test +// public void testSelectCompactionForOutOfOrderSize() throws Exception { +// +// //Order is important here. We want to make sure that sorting takes place. +// candidates.add(createMockStoreFile(256)); +// candidates.add(createMockStoreFile(256)); +// candidates.add(createMockStoreFile(256)); +// StoreFile large = createMockStoreFile(746); +// candidates.add(large); +// candidates.add(createMockStoreFile(256)); +// +// CompactSelection sel = dcp.selectCompaction(candidates, false, false); +// assertFalse(sel.getFilesToCompact().contains(large)); +// } + + private StoreFile createMockStoreFile(int sizeMb) { + StoreFile mockSf = mock(StoreFile.class); + StoreFile.Reader reader = mock(StoreFile.Reader.class); + + when(reader.getSequenceID()).thenReturn(-1l); + when(reader.getTotalUncompressedBytes()).thenReturn(1024l * 1024l * sizeMb); + when(reader.length()).thenReturn(1024l * 1024l * sizeMb); + + when(mockSf.excludeFromMinorCompaction()).thenReturn(false); + when(mockSf.getReader()).thenReturn(reader); + when(mockSf.toString()).thenReturn("MockStoreFile File Size: " + sizeMb); + + return mockSf; + } + + private HStore createMockStore() { + HStore s = mock(HStore.class); + when(s.getStoreFileTtl()).thenReturn(Long.MAX_VALUE); + + return s; + } + +} diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java new file mode 100644 index 0000000..96e7926 --- /dev/null +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestPerfCompactionPolicy.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.compactions; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(Parameterized.class) +public class TestPerfCompactionPolicy { + + static final Log LOG = LogFactory.getLog(TestPerfCompactionPolicy.class); + + private final String name; + private final CompactionPolicy cp; + private final Configuration configuration; + private long written = 0; + private long iterations = 0; + private long files = 0; + + @Parameterized.Parameters + public static Collection data() { + return Arrays.asList(new Object[][] { + {"Default", new DefaultCompactionPolicy()}, + {"DefaultWithSort", new DefaultCompactionPolicyWithSort()}, + {"DefaultWithSortTakeSmall", new DefaultCompactionPolicyWithSortTakeSmall()}, + + }); + } + + public TestPerfCompactionPolicy(String name, CompactionPolicy cp) { + + org.apache.log4j.Logger.getLogger(CompactionConfiguration.class). + setLevel(org.apache.log4j.Level.ERROR); + + org.apache.log4j.Logger.getLogger(cp.getClass()). + setLevel(org.apache.log4j.Level.ERROR); + + this.name = name; + this.cp = cp; + + + configuration = HBaseConfiguration.create(); + + //Make sure that this doesn't include every file. + configuration.setInt("hbase.hstore.compaction.max", 4); + configuration.setInt("hbase.hstore.compaction.min", 2); + + cp.store = createMockStore(); + + //Now set the conf. + cp.setConf(configuration); + } + + @Test + public void testCompactions() throws Exception { + + int[][] fileSizes = new int[][] { + {1000,350,200,100,20,10,10}, + {1000,450,200,100,20,10,10}, + {1000,550,200,100,20,10,10}, + {1000,650,200,100,20,10,10}, + {1000,250,25,25,25,25,25,25}, + {25,25,25,25,25,25, 500}, + {1000,1000,1000,1000,900}, + {107, 50, 10, 10, 10, 10}, + {2000, 107, 50, 10, 10, 10, 10}, + {9,8,7,6,5,4,3,2,1}, + {11,18,9,8,7,6,5,4,3,2,1}, + {110,18,18,18,18,9,8,7,6,5,4,3,2,1} + }; + + for(int[] fs:fileSizes) { + runIteration(fs); + } + + + LOG.debug(this.name + " wrote " + + written + + "mb in " + + iterations + + " compactions leaving " + + files + + " files"); + } + + private void runIteration(int[] fs) throws IOException { + List storeFiles = createStoreFileList(fs); + long writtenInRun = 0; + while (storeFiles.size() > 1) { + iterations += 1; + CompactSelection sel = cp.selectCompaction(new ArrayList(storeFiles), false, false); + int newFileSize = 0; + + List filesToCompact = sel.getFilesToCompact(); + if (filesToCompact.isEmpty()) { + break; + } + + storeFiles = new ArrayList(storeFiles); + storeFiles.removeAll(filesToCompact); + + for(StoreFile storeFile: filesToCompact) { + newFileSize += storeFile.getReader().length(); + } + storeFiles.add(createMockStoreFile(newFileSize)); + writtenInRun += newFileSize; + } + //LOG.debug("Finished single test wrote " + writtenInRun + " leaving " + sfToString(storeFiles)); + written += writtenInRun; + files += storeFiles.size(); + } + + private String sfToString(List storeFiles) { + String res = "["; + + for(StoreFile s:storeFiles) { + res += s.getReader().length() + ","; + } + return res+ "]"; + } + + private List createStoreFileList(int[] fs) { + List storeFiles = new LinkedList(); + for(int fileSize:fs) { + storeFiles.add(createMockStoreFile(fileSize)); + } + return storeFiles; + } + + + private StoreFile createMockStoreFile(int sizeMb) { + StoreFile mockSf = mock(StoreFile.class); + StoreFile.Reader reader = mock(StoreFile.Reader.class); + + when(reader.getSequenceID()).thenReturn(-1l); + when(reader.getTotalUncompressedBytes()).thenReturn(Long.valueOf(sizeMb)); + when(reader.length()).thenReturn(Long.valueOf(sizeMb)); + + when(mockSf.excludeFromMinorCompaction()).thenReturn(false); + when(mockSf.getReader()).thenReturn(reader); + when(mockSf.toString()).thenReturn("MockStoreFile File Size: " + sizeMb); + + return mockSf; + } + + private HStore createMockStore() { + HStore s = mock(HStore.class); + when(s.getStoreFileTtl()).thenReturn(Long.MAX_VALUE); + + return s; + } + +} -- 1.7.10.2 (Apple Git-33)