### Eclipse Workspace Patch 1.0 #P apache-trunk Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (revision 1438356) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (working copy) @@ -33,6 +33,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.compactions.PriorityCompactionQueue; import com.google.common.base.Preconditions; @@ -213,6 +214,19 @@ if (this.server.isStopped()) { return; } + // Remove existed compaction request for the same store + CompactionRequest existedRequest = ((PriorityCompactionQueue) largeCompactions + .getQueue()).removeCompcationRequest(s); + if (existedRequest != null && LOG.isTraceEnabled()) { + LOG.trace("Remove existed compaction request " + existedRequest); + } + if (smallCompactions != null) { + existedRequest = ((PriorityCompactionQueue) smallCompactions.getQueue()) + .removeCompcationRequest(s); + if (existedRequest != null && LOG.isTraceEnabled()) { + LOG.trace("Remove existed compaction request " + existedRequest); + } + } CompactionRequest cr = s.requestCompaction(priority); if (cr != null) { cr.setServer(server); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PriorityCompactionQueue.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PriorityCompactionQueue.java (revision 0) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PriorityCompactionQueue.java (revision 0) @@ -0,0 +1,119 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.regionserver.Store; + +/** + * This class delegates to the PriorityBlockingQueue but wraps all Stores in + * compaction requests that hold the priority, the date requested and the files + * to compact, see {@link CompactionRequest} + */ +@SuppressWarnings("serial") +public class PriorityCompactionQueue extends PriorityBlockingQueue { + /** Concurrent hash map of the Stores contained within the Compaction Queue */ + private final ConcurrentHashMap storesInQueue = new ConcurrentHashMap(); + + /** + * Remove the given store's existed compaction request + * @param store + * @return existed compaction request + */ + public CompactionRequest removeCompcationRequest(Store store) { + CompactionRequest existedRequest = storesInQueue.remove(store); + if (existedRequest != null && super.remove(existedRequest)) { + store.finishRequest(existedRequest); + return existedRequest; + } + return null; + } + + @Override + public boolean offer(Runnable e) { + if (!(e instanceof CompactionRequest)) { + throw new UnsupportedOperationException("Not supported for type " + + e.getClass().getName()); + } + CompactionRequest cr = (CompactionRequest) e; + boolean successful = super.offer(cr); + if (successful) { + storesInQueue.put(cr.getStore(), cr); + } + return successful; + } + + @Override + public CompactionRequest poll() { + CompactionRequest cr = (CompactionRequest) super.poll(); + if (cr != null) { + storesInQueue.remove(cr.getStore()); + } + return cr; + } + + @Override + public CompactionRequest poll(long timeout, TimeUnit unit) + throws InterruptedException { + CompactionRequest cr = (CompactionRequest) super.poll(timeout, unit); + if (cr != null) { + storesInQueue.remove(cr.getStore()); + } + return cr; + } + + @Override + public CompactionRequest take() throws InterruptedException { + CompactionRequest cr = (CompactionRequest) super.take(); + if (cr != null) { + storesInQueue.remove(cr.getStore()); + } + return cr; + } + + @Override + public boolean remove(Object o) { + if (super.remove(o)) { + assert o instanceof CompactionRequest; + storesInQueue.remove(((CompactionRequest) o).getStore()); + return true; + } + return false; + } + + @Override + public void clear() { + super.clear(); + storesInQueue.clear(); + } + + @Override + public int drainTo(Collection c) { + throw new UnsupportedOperationException("Not supported."); + } + + @Override + public int drainTo(Collection c, int maxElements) { + throw new UnsupportedOperationException("Not supported."); + } +}