diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 690a616..b02f5af 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -67,7 +67,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; -import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactionTracker; +import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; @@ -147,7 +148,9 @@ public class HStore implements Store { final StoreEngine storeEngine; - private OffPeakCompactions offPeakCompactions; + private static final OffPeakCompactionTracker offPeakCompactionTracker = + new OffPeakCompactionTracker(); + private final OffPeakHours offPeakHours; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; private static int flush_retries_number; @@ -199,7 +202,7 @@ public class HStore implements Store { // to clone it? scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); this.memstore = new MemStore(conf, this.comparator); - this.offPeakCompactions = new OffPeakCompactions(conf); + this.offPeakHours = OffPeakHours.getInstance(conf); // Setting up cache configuration for this family this.cacheConf = new CacheConfig(conf, family); @@ -1183,13 +1186,21 @@ public class HStore implements Store { // Normal case - coprocessor is not overriding file selection. if (!compaction.hasSelection()) { boolean isUserCompaction = priority == Store.PRIORITY_USER; - boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest(); - compaction.select(this.filesCompacting, isUserCompaction, + boolean mayUseOffPeak = offPeakHours.isOffPeakHour() && + offPeakCompactionTracker.tryStartOffPeakRequest(); + try { + compaction.select(this.filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); + } catch (IOException e) { + if (mayUseOffPeak) { + offPeakCompactionTracker.endOffPeakRequest(); + } + throw e; + } assert compaction.hasSelection(); if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) { // Compaction policy doesn't want to take advantage of off-peak. - this.offPeakCompactions.endOffPeakRequest(); + offPeakCompactionTracker.endOffPeakRequest(); } } if (this.getCoprocessorHost() != null) { @@ -1249,7 +1260,7 @@ public class HStore implements Store { private void finishCompactionRequest(CompactionRequest cr) { this.region.reportCompactionRequestEnd(cr.isMajor()); if (cr.isOffPeak()) { - this.offPeakCompactions.endOffPeakRequest(); + offPeakCompactionTracker.endOffPeakRequest(); cr.setOffPeak(false); } synchronized (filesCompacting) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactionTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactionTracker.java new file mode 100644 index 0000000..975c472 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactionTracker.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class OffPeakCompactionTracker { + private static final Log LOG = LogFactory.getLog(OffPeakCompactionTracker.class); + + /** + * Number of off peak compactions either in the compaction queue or happening now. + * Currently only one off peak compaction is present in the compaction queue. + */ + private final AtomicLong numOutstanding = new AtomicLong(); + + /** + * Tries making the compaction off-peak. + * @return Whether the compaction can be made off-peak. + */ + public boolean tryStartOffPeakRequest() { + return numOutstanding.compareAndSet(0, 1); + } + + /** + * The current compaction finished, so reset the off peak compactions count. + */ + public void endOffPeakRequest() { + boolean trueExpected = numOutstanding.compareAndSet(1, 0); + assert trueExpected; + LOG.info("Compaction done."); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java deleted file mode 100644 index f93a668..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakCompactions.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver.compactions; - -import java.util.Calendar; -import java.util.GregorianCalendar; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; - -/** - * The class used to track off-peak hours and compactions. Off-peak compaction counter - * is global for the entire server, hours can be different per instance of this class, - * based on the configuration of the corresponding store. - */ -@InterfaceAudience.Private -public class OffPeakCompactions { - private static final Log LOG = LogFactory.getLog(OffPeakCompactions.class); - private final static Calendar calendar = new GregorianCalendar(); - private int offPeakStartHour; - private int offPeakEndHour; - - // TODO: replace with AtomicLong, see HBASE-7437. - /** - * Number of off peak compactions either in the compaction queue or - * happening now. Please lock compactionCountLock before modifying. - */ - private static long numOutstanding = 0; - - /** - * Lock object for numOutstandingOffPeakCompactions - */ - private static final Object compactionCountLock = new Object(); - - public OffPeakCompactions(Configuration conf) { - offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1); - offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1); - if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) { - if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) { - LOG.warn("Ignoring invalid start/end hour for peak hour : start = " + - this.offPeakStartHour + " end = " + this.offPeakEndHour + - ". Valid numbers are [0-23]"); - } - this.offPeakStartHour = this.offPeakEndHour = -1; - } - } - - /** - * Tries making the compaction off-peak. - * @return Whether the compaction can be made off-peak. - */ - public boolean tryStartOffPeakRequest() { - if (!isOffPeakHour()) return false; - synchronized(compactionCountLock) { - if (numOutstanding == 0) { - numOutstanding++; - return true; - } - } - return false; - } - - /** - * The current compaction finished, so reset the off peak compactions count - * if this was an off peak compaction. - */ - public void endOffPeakRequest() { - long newValueToLog = -1; - synchronized(compactionCountLock) { - newValueToLog = --numOutstanding; - } - LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + newValueToLog); - } - - /** - * @return whether this is off-peak hour - */ - private boolean isOffPeakHour() { - int currentHour = calendar.get(Calendar.HOUR_OF_DAY); - // If offpeak time checking is disabled just return false. - if (this.offPeakStartHour == this.offPeakEndHour) { - return false; - } - if (this.offPeakStartHour < this.offPeakEndHour) { - return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour); - } - return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour); - } - - private static boolean isValidHour(int hour) { - return (hour >= 0 && hour <= 23); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java new file mode 100644 index 0000000..295aa71 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/OffPeakHours.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.Calendar; +import java.util.GregorianCalendar; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +@InterfaceAudience.Private +public abstract class OffPeakHours { + private static final Log LOG = LogFactory.getLog(OffPeakHours.class); + + public static final OffPeakHours DISABLED = new OffPeakHours() { + @Override public boolean isOffPeakHour() { return false; } + @Override public boolean isOffPeakHour(int targetHour) { return false; } + }; + + public static OffPeakHours getInstance(Configuration conf) { + int startHour = conf.getInt("hbase.offpeak.start.hour", -1); + int endHour = conf.getInt("hbase.offpeak.end.hour", -1); + return getInstance(startHour, endHour); + } + + /** + * @param startHour inclusive + * @param endHour exclusive + */ + public static OffPeakHours getInstance(int startHour, int endHour) { + if (startHour == -1 && endHour == -1) { + return DISABLED; + } + + if (! isValidHour(startHour) || ! isValidHour(endHour)) { + if (LOG.isWarnEnabled()) { + LOG.warn("Ignoring invalid start/end hour for peak hour : start = " + + startHour + " end = " + endHour + + ". Valid numbers are [0-23]"); + } + return DISABLED; + } + + if (startHour == endHour) { + return DISABLED; + } + + return new OffPeakHoursImpl(startHour, endHour); + } + + private static boolean isValidHour(int hour) { + return 0 <= hour && hour <= 23; + } + + /** + * @return whether {@code targetHour} is off-peak hour + */ + public abstract boolean isOffPeakHour(int targetHour); + + /** + * @return whether it is off-peak hour + */ + public abstract boolean isOffPeakHour(); + + private static class OffPeakHoursImpl extends OffPeakHours { + final int startHour; + final int endHour; + + /** + * @param startHour inclusive + * @param endHour exclusive + */ + OffPeakHoursImpl(int startHour, int endHour) { + this.startHour = startHour; + this.endHour = endHour; + } + + @Override + public boolean isOffPeakHour() { + Calendar calendar = new GregorianCalendar(); + int currentHour = calendar.get(Calendar.HOUR_OF_DAY); + return isOffPeakHour(currentHour); + } + + @Override + public boolean isOffPeakHour(int targetHour) { + if (startHour <= endHour) { + return startHour <= targetHour && targetHour < endHour; + } + return targetHour < endHour || startHour <= targetHour; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java deleted file mode 100644 index 97cb289..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestOffPeakCompactions.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.junit.Assert.*; - -import java.io.IOException; -import java.util.Calendar; -import java.util.GregorianCalendar; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.SmallTests; -import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(SmallTests.class) -public class TestOffPeakCompactions { - private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - @Test - public void testOffPeakHours() throws IOException { - Calendar calendar = new GregorianCalendar(); - int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY); - LOG.debug("Hour of day = " + hourOfDay); - int hourPlusOne = ((hourOfDay+1)%24); - int hourMinusOne = ((hourOfDay-1+24)%24); - int hourMinusTwo = ((hourOfDay-2+24)%24); - - Configuration conf = TEST_UTIL.getConfiguration(); - OffPeakCompactions opc = new OffPeakCompactions(conf); - LOG.debug("Testing without off-peak settings..."); - assertFalse(opc.tryStartOffPeakRequest()); - - // set peak hour to current time and check compact selection - conf.setLong("hbase.offpeak.start.hour", hourMinusOne); - conf.setLong("hbase.offpeak.end.hour", hourPlusOne); - opc = new OffPeakCompactions(conf); - LOG.debug("Testing compact selection with off-peak settings (" + - hourMinusOne + ", " + hourPlusOne + ")"); - assertTrue(opc.tryStartOffPeakRequest()); - opc.endOffPeakRequest(); - - // set peak hour outside current selection and check compact selection - conf.setLong("hbase.offpeak.start.hour", hourMinusTwo); - conf.setLong("hbase.offpeak.end.hour", hourMinusOne); - opc = new OffPeakCompactions(conf); - assertFalse(opc.tryStartOffPeakRequest()); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java new file mode 100644 index 0000000..d2f955c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestOffPeakHours.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.SmallTests; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(SmallTests.class) +public class TestOffPeakHours { + private static HBaseTestingUtility testUtil; + + @BeforeClass + public static void setUpClass() { + testUtil = new HBaseTestingUtility(); + } + + private int hourOfDay; + private int hourPlusOne; + private int hourMinusOne; + private int hourMinusTwo; + private Configuration conf; + + @Before + public void setUp() { + hourOfDay = 15; + hourPlusOne = ((hourOfDay+1)%24); + hourMinusOne = ((hourOfDay-1+24)%24); + hourMinusTwo = ((hourOfDay-2+24)%24); + conf = testUtil.getConfiguration(); + } + + @Test + public void testWithoutSettings() { + Configuration conf = testUtil.getConfiguration(); + OffPeakHours target = OffPeakHours.getInstance(conf); + assertFalse(target.isOffPeakHour(hourOfDay)); + } + + @Test + public void testSetPeakHourToTargetTime() { + conf.setLong("hbase.offpeak.start.hour", hourMinusOne); + conf.setLong("hbase.offpeak.end.hour", hourPlusOne); + OffPeakHours target = OffPeakHours.getInstance(conf); + assertTrue(target.isOffPeakHour(hourOfDay)); + } + + @Test + public void testSetPeakHourOutsideCurrentSelection() { + conf.setLong("hbase.offpeak.start.hour", hourMinusTwo); + conf.setLong("hbase.offpeak.end.hour", hourMinusOne); + OffPeakHours target = OffPeakHours.getInstance(conf); + assertFalse(target.isOffPeakHour(hourOfDay)); + } +}