diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 2fd2a14..522de31 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -19,9 +19,9 @@ package org.apache.hadoop.hbase.regionserver; import java.util.ArrayList; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.ListIterator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.CopyOnWriteArrayList2; /** * The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. @@ -49,7 +50,7 @@ public class CompactionPipeline { public final static long ENTRY_OVERHEAD = ClassSize.LINKEDLIST_ENTRY; private final RegionServicesForStores region; - private LinkedList pipeline; + private CopyOnWriteArrayList2 pipeline; private long version; private static final ImmutableSegment EMPTY_MEM_STORE_SEGMENT = SegmentFactory.instance() @@ -57,7 +58,7 @@ public class CompactionPipeline { public CompactionPipeline(RegionServicesForStores region) { this.region = region; - this.pipeline = new LinkedList(); + this.pipeline = new CopyOnWriteArrayList2(); this.version = 0; } @@ -84,7 +85,7 @@ public class CompactionPipeline { synchronized (pipeline){ version++; for(int i=0; i getScanners(long readPoint, long order) { List scanners = new ArrayList(this.pipeline.size()); - for (Segment segment : this.pipeline) { + for (Object o : this.pipeline) { + Segment segment = (Segment) o; scanners.add(segment.getScanner(readPoint, order)); // The order is the Segment ordinal order--; @@ -229,15 +232,25 @@ public class CompactionPipeline { public long getMinSequenceId() { long minSequenceId = Long.MAX_VALUE; - if (!isEmpty()) { - minSequenceId = pipeline.getLast().getMinSequenceId(); + ListIterator iter = pipeline.listIterator(); + ImmutableSegment last = null; + while (iter.hasNext()) { + last = iter.next(); + } + if (last != null) { + minSequenceId = last.getMinSequenceId(); } return minSequenceId; } public MemstoreSize getTailSize() { - if (isEmpty()) return MemstoreSize.EMPTY_SIZE; - return new MemstoreSize(pipeline.peekLast().keySize(), pipeline.peekLast().heapOverhead()); + ListIterator iter = pipeline.listIterator(); + ImmutableSegment last = null; + while (iter.hasNext()) { + last = iter.next(); + } + if (last == null) return MemstoreSize.EMPTY_SIZE; + return new MemstoreSize(last.keySize(), last.heapOverhead()); } public MemstoreSize getPipelineSize() { @@ -260,7 +273,7 @@ public class CompactionPipeline { } } pipeline.removeAll(suffix); - pipeline.addLast(segment); + pipeline.add(segment); } private ImmutableSegment removeLast() { @@ -269,34 +282,7 @@ public class CompactionPipeline { } private boolean addFirst(ImmutableSegment segment) { - pipeline.addFirst(segment); + pipeline.add(0, segment); return true; } - - // debug method - private boolean validateSuffixList(LinkedList suffix) { - if(suffix.isEmpty()) { - // empty suffix is always valid - return true; - } - Iterator pipelineBackwardIterator = pipeline.descendingIterator(); - Iterator suffixBackwardIterator = suffix.descendingIterator(); - ImmutableSegment suffixCurrent; - ImmutableSegment pipelineCurrent; - for( ; suffixBackwardIterator.hasNext(); ) { - if(!pipelineBackwardIterator.hasNext()) { - // a suffix longer than pipeline is invalid - return false; - } - suffixCurrent = suffixBackwardIterator.next(); - pipelineCurrent = pipelineBackwardIterator.next(); - if(suffixCurrent != pipelineCurrent) { - // non-matching suffix - return false; - } - } - // suffix matches pipeline suffix - return true; - } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CopyOnWriteArrayList2.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CopyOnWriteArrayList2.java new file mode 100644 index 0000000..01afcbb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/CopyOnWriteArrayList2.java @@ -0,0 +1,110 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Extension of CopyOnWriteArrayList supporting: + * removeLast() + */ +@InterfaceAudience.Private +public class CopyOnWriteArrayList2 extends CopyOnWriteArrayList { + private static final Log LOG = LogFactory.getLog(CopyOnWriteArrayList2.class); + private static Method getArrayMethod, setArrayMethod; + private static Field lockField; + static { + try { + Class c = Class.forName("java.util.concurrent.CopyOnWriteArrayList"); + getArrayMethod = c.getDeclaredMethod("getArray"); + getArrayMethod.setAccessible(true); + + lockField = c.getDeclaredField("lock"); + lockField.setAccessible(true); + + setArrayMethod = c.getDeclaredMethod("setArray", Object[].class); + setArrayMethod.setAccessible(true); + } catch (Exception e) { + throw new RuntimeException("CopyOnWriteArrayList is not available", e); + } + } + + private transient ReentrantLock lock; + public CopyOnWriteArrayList2() { + super(); + try { + lock = (ReentrantLock) lockField.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public CopyOnWriteArrayList2(Collection c) { + super(c); + try { + lock = (ReentrantLock) lockField.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public CopyOnWriteArrayList2(E[] toCopyIn) { + super(toCopyIn); + try { + lock = (ReentrantLock) lockField.get(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Removes the element at the tail of this list. + * Returns the element that was removed from the list. + * + * @throws NoSuchElementException {@inheritDoc} + */ + public E removeLast() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = (Object[]) getArrayMethod.invoke(this); + int len = elements.length; + if (len == 0) { + throw new NoSuchElementException("Empty list"); + } + @SuppressWarnings("unchecked") + E oldValue = (E) elements[len - 1]; + setArrayMethod.invoke(this, Arrays.copyOf(elements, len - 1)); + return oldValue; + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + lock.unlock(); + } + } +}