diff --git a/src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java b/src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java index ab15477..e06b169 100644 --- a/src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java +++ b/src/java/org/apache/hadoop/hbase/io/HalfHFileReader.java @@ -51,7 +51,7 @@ public class HalfHFileReader extends HFile.Reader { final boolean top; // This is the key we split around. Its the first possible entry on a row: // i.e. empty column and a timestamp of LATEST_TIMESTAMP. - final byte [] splitkey; + protected final byte [] splitkey; /** * @param fs @@ -83,36 +83,50 @@ public class HalfHFileReader extends HFile.Reader { final HFileScanner s = super.getScanner(); return new HFileScanner() { final HFileScanner delegate = s; + public boolean atEnd = false; public ByteBuffer getKey() { + if (atEnd) return null; return delegate.getKey(); } public String getKeyString() { + if (atEnd) return null; + return delegate.getKeyString(); } public ByteBuffer getValue() { + if (atEnd) return null; + return delegate.getValue(); } public String getValueString() { + if (atEnd) return null; + return delegate.getValueString(); } public KeyValue getKeyValue() { + if (atEnd) return null; + return delegate.getKeyValue(); } public boolean next() throws IOException { + if (atEnd) return false; + boolean b = delegate.next(); if (!b) { return b; } + // constrain the bottom. if (!top) { ByteBuffer bb = getKey(); if (getComparator().compare(bb.array(), bb.arrayOffset(), bb.limit(), splitkey, 0, splitkey.length) >= 0) { + atEnd = true; return false; } } @@ -151,7 +165,7 @@ public class HalfHFileReader extends HFile.Reader { } return true; } - + boolean b = delegate.seekTo(); if (!b) { return b; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java b/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java index 3e41983..a66aa04 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/DeleteCompare.java @@ -1,3 +1,23 @@ +/* + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.KeyValue; @@ -69,7 +89,6 @@ public class DeleteCompare { if(res > 0) { return DeleteCode.DONE; } else if(res < 0){ - System.out.println("SKIPPING ROW"); return DeleteCode.SKIP; } @@ -113,7 +132,6 @@ public class DeleteCompare { } return DeleteCode.DONE; } else { - System.out.println("SKIPPING TS"); return DeleteCode.SKIP; } } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java index fa9775a..3a7b747 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/MinorCompactingStoreScanner.java @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.util.Bytes; import java.util.List; @@ -33,22 +35,18 @@ import java.io.IOException; * and optionally the memcache-snapshot. */ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalScanner { - private QueryMatcher matcher; private KeyValueHeap heap; - + private ScanDeleteTracker deleteTracker; + private KeyValue.KVComparator comparator; MinorCompactingStoreScanner(Store store, KeyValueScanner [] scanners) { - Scan scan = new Scan(); - - // No max version, no ttl matching, start at first row, all columns. - matcher = new ScanQueryMatcher(scan, store.getFamily().getName(), - null, Long.MAX_VALUE, store.comparator.getRawComparator(), - store.versionsToReturn(Integer.MAX_VALUE)); - + comparator = store.comparator; + deleteTracker = new ScanDeleteTracker(store.comparator.getRawComparator()); + KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW); for (KeyValueScanner scanner : scanners ) { - scanner.seek(matcher.getStartKey()); + scanner.seek(firstKv); } heap = new KeyValueHeap(scanners, store.comparator); @@ -56,13 +54,12 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca MinorCompactingStoreScanner(String cfName, KeyValue.KVComparator comparator, KeyValueScanner [] scanners) { - Scan scan = new Scan(); - matcher = new ScanQueryMatcher(scan, Bytes.toBytes(cfName), - null, Long.MAX_VALUE, comparator.getRawComparator(), - Integer.MAX_VALUE); + this.comparator = comparator; + deleteTracker = new ScanDeleteTracker(comparator.getRawComparator()); + KeyValue firstKv = KeyValue.createFirstOnRow(HConstants.EMPTY_START_ROW); for (KeyValueScanner scanner : scanners ) { - scanner.seek(matcher.getStartKey()); + scanner.seek(firstKv); } heap = new KeyValueHeap(scanners, comparator); @@ -82,32 +79,94 @@ public class MinorCompactingStoreScanner implements KeyValueScanner, InternalSca throw new UnsupportedOperationException("Can't seek a MinorCompactingStoreScanner"); } + /** + * High performance merge scan. + * @param writer + * @return + * @throws IOException + */ + public boolean next(HFile.Writer writer) throws IOException { + KeyValue row = heap.peek(); + if (row == null) { + close(); + return false; + } + // between rows. + deleteTracker.reset(); + + KeyValue kv; + while ((kv = heap.peek()) != null) { + // check to see if this is a different row + if (comparator.compareRows(row, kv) != 0) { + // reached next row + return true; + } + + // if delete type, output no matter what: + if (kv.getType() != KeyValue.Type.Put.getCode()) { + deleteTracker.add(kv.getBuffer(), + kv.getQualifierOffset(), + kv.getQualifierLength(), + kv.getTimestamp(), + kv.getType()); + + writer.append(heap.next()); + continue; + } + + if (deleteTracker.isDeleted(kv.getBuffer(), + kv.getQualifierOffset(), + kv.getQualifierLength(), + kv.getTimestamp())) { + heap.next(); + continue; + } + + writer.append(heap.next()); + } + close(); + return false; + } + @Override public boolean next(List results) throws IOException { - KeyValue peeked = heap.peek(); - if (peeked == null) { + KeyValue row = heap.peek(); + if (row == null) { close(); return false; } - matcher.setRow(peeked.getRow()); + // between rows. + deleteTracker.reset(); + KeyValue kv; while ((kv = heap.peek()) != null) { + // check to see if this is a different row + if (comparator.compareRows(row, kv) != 0) { + // reached next row + return true; + } + // if delete type, output no matter what: - if (kv.getType() != KeyValue.Type.Put.getCode()) - results.add(kv); - - switch (matcher.match(kv)) { - case INCLUDE: - results.add(heap.next()); - continue; - case DONE: - if (results.isEmpty()) { - matcher.setRow(heap.peek().getRow()); - continue; - } - return true; + if (kv.getType() != KeyValue.Type.Put.getCode()) { + deleteTracker.add(kv.getBuffer(), + kv.getQualifierOffset(), + kv.getQualifierLength(), + kv.getTimestamp(), + kv.getType()); + + results.add(heap.next()); + continue; } - heap.next(); + + if (deleteTracker.isDeleted(kv.getBuffer(), + kv.getQualifierOffset(), + kv.getQualifierLength(), + kv.getTimestamp())) { + heap.next(); + continue; + } + + results.add(heap.next()); } close(); return false; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java index 9962f55..b92a421 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java @@ -54,7 +54,6 @@ public class ScanQueryMatcher extends QueryMatcher { public ScanQueryMatcher(Scan scan, byte [] family, NavigableSet columns, long ttl, KeyValue.KeyComparator rowComparator, int maxVersions) { - this.row = row; this.tr = scan.getTimeRange(); this.oldestStamp = System.currentTimeMillis() - ttl; this.rowComparator = rowComparator; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/java/org/apache/hadoop/hbase/regionserver/Store.java index 9f89a9a..00c92ab 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -834,30 +834,43 @@ public class Store implements HConstants { scanners[i] = new StoreFileScanner(filesToCompact.get(i).getReader().getScanner()); } - InternalScanner scanner; if (majorCompaction) { - Scan scan = new Scan(); - scan.setMaxVersions(family.getMaxVersions()); - // TODO pass in the scanners/store files. - scanner = new StoreScanner(this, scan, null); + InternalScanner scanner = null; + + try { + Scan scan = new Scan(); + scan.setMaxVersions(family.getMaxVersions()); + // TODO pass in the scanners/store files. + scanner = new StoreScanner(this, scan, null); + + // since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + ArrayList row = new ArrayList(); + boolean more = true; + while ( more ) { + more = scanner.next(row); + // output to writer: + for (KeyValue kv : row) { + writer.append(kv); + } + row.clear(); + } + } finally { + if (scanner != null) + scanner.close(); + } } else { - scanner = new MinorCompactingStoreScanner(this, scanners); - } + MinorCompactingStoreScanner scanner = null; + try { + scanner = new MinorCompactingStoreScanner(this, scanners); - // since scanner.next() can return 'false' but still be delivering data, - // we have to use a do/while loop. - ArrayList row = new ArrayList(); - boolean more = true; - while ( more ) { - more = scanner.next(row); - // output to writer: - for (KeyValue kv : row) { - writer.append(kv); + while ( scanner.next(writer) ) { } + } finally { + if (scanner != null) + scanner.close(); } - row.clear(); } - scanner.close(); } /* diff --git a/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 6813329..1cb5847 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -333,7 +333,8 @@ public class StoreFile implements HConstants { @Override public String toString() { - return super.toString() + (isTop()? ", half=top": ", half=bottom"); + return super.toString() + (isTop()? ", half=top": ", half=bottom") + + " splitKey: " + KeyValue.keyToString(splitkey); } @Override diff --git a/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 1f5ea69..d8e9ede 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -40,6 +40,10 @@ class StoreFileScanner implements KeyValueScanner { public StoreFileScanner(HFileScanner hfs) { this.hfs = hfs; } + + public String toString() { + return "StoreFileScanner[" + hfs.toString() + ", cur=" + cur + "]"; + } public KeyValue peek() { return cur; diff --git a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java index e9087c1..a1ec63c 100644 --- a/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/src/test/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; @@ -221,6 +222,7 @@ public class TestHRegion extends HBaseTestCase { List kvs = new ArrayList(); kvs.add(new KeyValue(row1, fam4, null, null)); + //testing existing family byte [] family = fam2; try { @@ -239,6 +241,55 @@ public class TestHRegion extends HBaseTestCase { } assertEquals("Family " +new String(family)+ " does exist", true, ok); } + + public void testDelete_mixed() throws IOException { + byte [] tableName = Bytes.toBytes("testtable"); + byte [] fam = Bytes.toBytes("info"); + byte [][] families = {fam}; + String method = this.getName(); + initHRegion(tableName, method, families); + + byte [] row = Bytes.toBytes("table_name"); + // column names + byte [] serverinfo = Bytes.toBytes("serverinfo"); + byte [] splitA = Bytes.toBytes("splitA"); + byte [] splitB = Bytes.toBytes("splitB"); + + // add some data: + Put put = new Put(row); + put.add(fam, splitA, Bytes.toBytes("reference_A")); + region.put(put); + + put = new Put(row); + put.add(fam, splitB, Bytes.toBytes("reference_B")); + region.put(put); + + put = new Put(row); + put.add(fam, serverinfo, Bytes.toBytes("ip_address")); + region.put(put); + + // ok now delete a split: + Delete delete = new Delete(row); + delete.deleteColumns(fam, splitA); + region.delete(delete, null, true); + + // assert some things: + Get get = new Get(row).addColumn(fam, serverinfo); + Result result = region.get(get, null); + assertEquals(1, result.size()); + + get = new Get(row).addColumn(fam, splitA); + result = region.get(get, null); + assertEquals(0, result.size()); + + get = new Get(row).addColumn(fam, splitB); + result = region.get(get, null); + assertEquals(1, result.size()); + + + + + } //Visual test, since the method doesn't return anything public void testDelete_CheckTimestampUpdated()