commit cadcdfb0c7f3cce285bf494e9da3148c220b02e0 Author: Todd Lipcon Date: Fri May 25 10:55:23 2012 -0700 some bitmap index hacking diff --git pom.xml pom.xml index 18de6ae..51e59be 100644 --- pom.xml +++ pom.xml @@ -279,6 +279,26 @@ + cdh.repo + https://repository.cloudera.com/artifactory/cloudera-repos + Cloudera Repositories + + false + + + + + cdh.snapshots.repo + https://repository.cloudera.com/artifactory/libs-snapshot-local + Cloudera Snapshots Repository + + true + + + false + + + java.net Java.Net http://download.java.net/maven/2/ @@ -1026,6 +1046,7 @@ 11.0.2 1.8.8 5.5.23 + 0.5.0 2.1 6.1.26 6.1.14 @@ -1093,6 +1114,11 @@ ${guava.version} + com.googlecode.javaewah + JavaEWAH + ${javaewah.version} + + commons-cli commons-cli ${commons-cli.version} @@ -1569,7 +1595,7 @@ 1.0.3 - 1.4.3 + 1.6.4 diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java index e3c4fe4..13c3740 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/BlockType.java @@ -76,6 +76,10 @@ public enum BlockType { /** Delete Family Bloom filter metadata, version 2 */ DELETE_FAMILY_BLOOM_META("DFBLMET2", BlockCategory.BLOOM), + + /** Bitmap index data */ + // TODO: is INDEX right? + BITMAP_INDEX("BMINDEX1", BlockCategory.INDEX), // Trailer diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 8e78a60..823b4c5 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.fs.HFileSystem; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.regionserver.BitmapIndexWriter; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.SchemaAware; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -307,6 +308,11 @@ public class HFile { * HFile V2. */ void addDeleteFamilyBloomFilter(BloomFilterWriter bfw) throws IOException; + + /** + * Store bitmap index information in the file, only supported in HFile v2. + */ + void addBitmapIndex(BitmapIndexWriter biw) throws IOException; } /** diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java index 9289af2..2d58bed 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java @@ -1053,6 +1053,11 @@ public class HFileBlockIndex { return totalBlockUncompressedSize; } + @Override + public void dataBlockWritten(byte[] firstKeyInBlock, + long lastDataBlockOffset, int onDiskSize) { + } + } /** diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java index a948907..6d7317c 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV1.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; +import org.apache.hadoop.hbase.regionserver.BitmapIndexWriter; import org.apache.hadoop.hbase.regionserver.MemStore; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.ChecksumType; @@ -415,6 +416,11 @@ public class HFileWriterV1 extends AbstractHFileWriter { throws IOException { throw new IOException("Delete Bloom filter is not supported in HFile V1"); } + + @Override + public void addBitmapIndex(BitmapIndexWriter biw) throws IOException { + throw new IOException("Bitmap index is not supported in HFile V1"); + } /** * Write out the index in the version 1 format. This conforms to the legacy diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java index d78badb..73ae860 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/HFileWriterV2.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KeyComparator; import org.apache.hadoop.hbase.io.hfile.HFile.Writer; import org.apache.hadoop.hbase.io.hfile.HFileBlock.BlockWritable; +import org.apache.hadoop.hbase.regionserver.BitmapIndexWriter; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.BloomFilterWriter; @@ -188,12 +189,16 @@ public class HFileWriterV2 extends AbstractHFileWriter { dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset, onDiskSize); totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader(); - + HFile.offerWriteLatency(System.nanoTime() - startTimeNs); if (cacheConf.shouldCacheDataOnWrite()) { doCacheOnWrite(lastDataBlockOffset); } + + for (InlineBlockWriter ibw : inlineBlockWriters) { + ibw.dataBlockWritten(firstKeyInBlock, lastDataBlockOffset, onDiskSize); + } } /** Gives inline block writers an opportunity to contribute blocks. */ @@ -475,4 +480,18 @@ public class HFileWriterV2 extends AbstractHFileWriter { }); } + @Override + public void addBitmapIndex(final BitmapIndexWriter biw) throws IOException { + additionalLoadOnOpenData.add(new BlockWritable() { + @Override + public void writeToBlock(DataOutput out) throws IOException { + biw.writeTo(out); + } + + @Override + public BlockType getBlockType() { + return BlockType.BITMAP_INDEX; + } + }); + } } diff --git src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java index 7068224..fbac720 100644 --- src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java +++ src/main/java/org/apache/hadoop/hbase/io/hfile/InlineBlockWriter.java @@ -71,4 +71,7 @@ public interface InlineBlockWriter { * @return true if inline blocks produced by this writer should be cached */ boolean cacheOnWrite(); + + void dataBlockWritten(byte[] firstKeyInBlock, long lastDataBlockOffset, + int onDiskSize); } \ No newline at end of file diff --git src/main/java/org/apache/hadoop/hbase/regionserver/BitmapIndexWriter.java src/main/java/org/apache/hadoop/hbase/regionserver/BitmapIndexWriter.java new file mode 100644 index 0000000..d5116b5 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/BitmapIndexWriter.java @@ -0,0 +1,111 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.InlineBlockWriter; +import org.apache.hadoop.hbase.util.Bytes; + + + +import javaewah.EWAHCompressedBitmap; +import javaewah.IntIterator; + + +public class BitmapIndexWriter implements InlineBlockWriter { + private static final Log LOG = LogFactory.getLog(BitmapIndexWriter.class); + + private int dataBlockIndex = 0; + + private ByteArrayCuckooMap> indexes = + new ByteArrayCuckooMap>(); + + private boolean written = false; + + void add(KeyValue kv) { + ByteArrayCuckooMap colMap = indexes.get( + kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength()); + if (colMap == null) { + colMap = new ByteArrayCuckooMap(); + indexes.put(kv.getBuffer(), kv.getQualifierOffset(), kv.getQualifierLength(), + colMap); + } + + EWAHCompressedBitmap bs = colMap.get( + kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + if (bs == null) { + bs = new EWAHCompressedBitmap(); + colMap.put(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength(), bs); + } + bs.set(dataBlockIndex); + } + + void finishBlock() { + dataBlockIndex++; + } + + public void writeTo(DataOutput out) { + LOG.info("Would have written bm index: " + dumpString()); + } + + private String dumpString() { + StringBuilder sb = new StringBuilder(); + for (Map.Entry> e : + indexes.entryIterable()) { + + byte[] col = e.getKey(); + sb.append(Bytes.toStringBinary(col)) + .append(":\n"); + for (Map.Entry e2 : + e.getValue().entryIterable()) { + byte[] val = e2.getKey(); + sb.append(" =") + .append(Bytes.toStringBinary(val)) + .append(": "); + IntIterator iter = e2.getValue().intIterator(); + while (iter.hasNext()) { + sb.append(iter.next()); + sb.append(" "); + } + sb.append("\n"); + } + } + return sb.toString(); + } + + @Override + public boolean shouldWriteBlock(boolean closing) { + return closing && !written; + } + + @Override + public void writeInlineBlock(DataOutput out) throws IOException { + writeTo(out); + written = true; + } + + @Override + public void blockWritten(long offset, int onDiskSize, int uncompressedSize) { + } + + @Override + public void dataBlockWritten(byte[] firstKeyInBlock, + long lastDataBlockOffset, int onDiskSize) { + dataBlockIndex++; + } + + @Override + public BlockType getInlineBlockType() { + return BlockType.BITMAP_INDEX; + } + + @Override + public boolean cacheOnWrite() { + return false; + } +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/ByteArrayCuckooMap.java src/main/java/org/apache/hadoop/hbase/regionserver/ByteArrayCuckooMap.java new file mode 100644 index 0000000..0f218c7 --- /dev/null +++ src/main/java/org/apache/hadoop/hbase/regionserver/ByteArrayCuckooMap.java @@ -0,0 +1,274 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Hash; + +import com.google.common.base.Preconditions; + +@SuppressWarnings("unchecked") +public class ByteArrayCuckooMap { + private final Hash hash; + private int seeds[] = new int[2]; + + private Random r = new Random(); + + private byte[][] keys; + private T[] values; + + private static final int DEFAULT_INITIAL_POW2 = 8; + private static final int MAX_INSERT_LOOP = 64; + + // half of keys.length + private int slotsPerSubtable; + + private int numElements; + private long changeCount; + + + public ByteArrayCuckooMap() { + this(DEFAULT_INITIAL_POW2); + } + + private ByteArrayCuckooMap(int pow2) { + Preconditions.checkArgument(pow2 < 30 && pow2 > 0); + this.slotsPerSubtable = 1 << pow2; + int arraySize = slotsPerSubtable << 1; + + this.hash = Hash.getInstance(Hash.MURMUR_HASH); + this.keys = new byte[arraySize][]; + this.values = (T[]) new Object[arraySize]; + repickSeeds(); + } + + private void repickSeeds() { + seeds[0] = r.nextInt(); + seeds[1] = r.nextInt(); + } + + public void put(byte[] key, T val) { + put(key, 0, key.length, val); + } + + public void put(byte[] key, int off, int len, + T val) { + // TODO: currentSize is a bad value for initial attempt count + put(key, off, len, true, val); + } + + private void put(byte[] key, int off, int len, + boolean copyKey, T val) { + + while (true) { + for (int i = 0; i < MAX_INSERT_LOOP; i++) { + int slot; + if ((i & 1) == 0) { + slot = slot1(key, off, len); + } else { + slot = slot2(key, off, len); + } + + if (keys[slot] != null) { + byte[] keyInSlot = keys[slot]; + if (Bytes.equals(key, off, len, + keyInSlot, 0, keyInSlot.length)) { + // Same key in slot, just update value. + values[slot] = val; + changeCount++; + return; + } else { + // need to rejigger cuckoos + byte[] saveKey = keys[slot]; + T saveVal = values[slot]; + + keys[slot] = maybeCopy(key, off, len, copyKey); + values[slot] = val; + + key = saveKey; + off = 0; + len = saveKey.length; + val = saveVal; + copyKey = false; + continue; + } + } else { + // Nothing in slot + keys[slot] = maybeCopy(key, off, len, copyKey); + values[slot] = val; + numElements++; + changeCount++; + return; + } + // should not get here + } + rehash(); + } + } + + private void rehash() { + float lf = loadFactor(); + System.err.println("Need to rehash: " + + "numElements=" + numElements + + " slots=" + slotsPerSubtable + + " lf=" + lf); + if (lf > 0.5f) { + rehash(slotsPerSubtable * 2); + } else { + rehash(slotsPerSubtable); + } + } + + private float loadFactor() { + return (float)numElements / (float)keys.length; + } + + private void rehash(int newSlotsPerSubtable) { + int newArraySize = newSlotsPerSubtable << 1; + + byte[][] oldKeys = keys; + T[] oldVals = values; + + keys = new byte[newArraySize][]; + values = (T[]) new Object[newArraySize]; + slotsPerSubtable = newSlotsPerSubtable; + repickSeeds(); + + numElements = 0; + for (int i = 0; i < oldKeys.length; i++) { + if (oldKeys[i] != null) { + put(oldKeys[i], 0, oldKeys[i].length, false, oldVals[i]); + } + } + } + + public T get(byte[] key) { + return get(key, 0, key.length); + } + + + public T get(byte[] key, int off, int len) { + int slot = slot1(key, off, len); + if (keys[slot] != null) { + byte[] keyInSlot = keys[slot]; + if (Bytes.equals(key, off, len, + keyInSlot, 0, keyInSlot.length)) { + return values[slot]; + } + } + + slot = slot2(key, off, len); + if (keys[slot] != null) { + byte[] keyInSlot = keys[slot]; + if (Bytes.equals(key, off, len, + keyInSlot, 0, keyInSlot.length)) { + return values[slot]; + } + } + + return null; + } + + private byte[] maybeCopy(byte[] key, int off, int len, boolean copy) { + if (!copy) { + assert off == 0 && len == key.length; + return key; + } else { + return Arrays.copyOfRange(key, off, off + len); + } + } + + private int slot1(byte[] key, int offset, int length) { + return hash1(key, offset, length) & (slotsPerSubtable - 1); + } + + private int slot2(byte[] key, int offset, int length) { + return hash2(key, offset, length) & (slotsPerSubtable - 1) | slotsPerSubtable; + } + + private int hash1(byte[] key, int offset, int length) { + return hash.hash(key, offset, length, seeds[0]); + } + private int hash2(byte[] key, int offset, int length) { + return hash.hash(key, offset, length, seeds[1]); + } + + + public Iterable> entryIterable() { + return new Iterable>() { + @Override + public Iterator> iterator() { + return new EntryIterator(); + } + }; + } + + private class EntryIterator implements Iterator> { + private final long initialChangeCount; + private int i = 0; + + public EntryIterator() { + this.initialChangeCount = changeCount; + advanceIdx(); + } + + @Override + public boolean hasNext() { + return i < keys.length; + } + + @Override + public Entry next() { + Preconditions.checkState(hasNext()); + if (changeCount != initialChangeCount) { + throw new ConcurrentModificationException(); + } + + + final byte[] key = keys[i]; + final T val = values[i]; + assert key != null; + + Entry ret = new Map.Entry() { + @Override + public byte[] getKey() { + return key; + } + + @Override + public T getValue() { + return val; + } + + @Override + public T setValue(T value) { + throw new UnsupportedOperationException(); + } + }; + i++; + advanceIdx(); + return ret; + } + + private void advanceIdx() { + while (i < keys.length && keys[i] == null) { + i++; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + + } + + + +} diff --git src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index fefd42a..6df8914 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -911,6 +911,9 @@ public class StoreFile extends SchemaConfigured { private final BloomType bloomType; private byte[] lastBloomKey; private int lastBloomKeyOffset, lastBloomKeyLen; + + private final BitmapIndexWriter bitmapIndexWriter; + private KVComparator kvComparator; private KeyValue lastKv = null; private long earliestPutTs = HConstants.LATEST_TIMESTAMP; @@ -998,6 +1001,10 @@ public class StoreFile extends SchemaConfigured { LOG.info("Delete Family Bloom filter type for " + path + ": " + deleteFamilyBloomFilterWriter.getClass().getSimpleName()); } + + bitmapIndexWriter = new BitmapIndexWriter(); + writer.addInlineBlockWriter(bitmapIndexWriter); + this.checksumType = checksumType; this.bytesPerChecksum = bytesPerChecksum; } @@ -1141,10 +1148,21 @@ public class StoreFile extends SchemaConfigured { } } } + + private void appendBitmapIndexer(final KeyValue kv) { + if (bitmapIndexWriter == null) return; + + boolean newKey = lastKv == null || + ! kvComparator.matchingRowColumn(kv, lastKv); + if (newKey) { + this.bitmapIndexWriter.add(kv); + } + } public void append(final KeyValue kv) throws IOException { appendGeneralBloomfilter(kv); appendDeleteFamilyBloomFilter(kv); + appendBitmapIndexer(kv); writer.append(kv); trackTimestamps(kv); } @@ -1206,6 +1224,13 @@ public class StoreFile extends SchemaConfigured { return hasDeleteFamilyBloom; } + + private boolean closeBitmapIndex() throws IOException { + if (bitmapIndexWriter == null) return false; + + writer.addBitmapIndex(bitmapIndexWriter); + return true; + } public void close() throws IOException { // Save data block encoder metadata in the file info. @@ -1213,6 +1238,7 @@ public class StoreFile extends SchemaConfigured { boolean hasGeneralBloom = this.closeGeneralBloomFilter(); boolean hasDeleteFamilyBloom = this.closeDeleteFamilyBloomFilter(); + boolean hasBitmapIndex = this.closeBitmapIndex(); writer.close(); @@ -1221,7 +1247,9 @@ public class StoreFile extends SchemaConfigured { StoreFile.LOG.info((hasGeneralBloom ? "" : "NO ") + "General Bloom and " + (hasDeleteFamilyBloom ? "" : "NO ") + "DeleteFamily" + " was added to HFile (" + getPath() + ") "); - + if (hasBitmapIndex) { + StoreFile.LOG.info("bitmap index added to HFile " + getPath()); + } } public void appendFileInfo(byte[] key, byte[] value) throws IOException { diff --git src/test/java/org/apache/hadoop/hbase/regionserver/TestByteArrayCuckooMap.java src/test/java/org/apache/hadoop/hbase/regionserver/TestByteArrayCuckooMap.java new file mode 100644 index 0000000..e4fa8b8 --- /dev/null +++ src/test/java/org/apache/hadoop/hbase/regionserver/TestByteArrayCuckooMap.java @@ -0,0 +1,49 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Test; + +import com.google.common.collect.Sets; + +import static org.junit.Assert.*; + +public class TestByteArrayCuckooMap { + @Test + public void testPut() { + ByteArrayCuckooMap m = new ByteArrayCuckooMap(); + m.put(Bytes.toBytes("hello"), "hello"); + assertEquals("hello", m.get(Bytes.toBytes("hello"))); + assertEquals("hello", + m.get(Bytes.toBytes("xhellox"), 1, 5)); + m.put(Bytes.toBytes("xhellox"), 1, 5, "hello2"); + assertEquals("hello2", m.get(Bytes.toBytes("hello"))); + assertEquals("hello2", + m.get(Bytes.toBytes("xhellox"), 1, 5)); + } + + @Test + public void testLoad() { + ByteArrayCuckooMap m = new ByteArrayCuckooMap(); + + for (int i = 0; i < 20000; i++) { + String s = String.valueOf(i); + byte[] b = Bytes.toBytes(s); + m.put(b, s); + } + + for (int i = 0; i < 20000; i++) { + String s = String.valueOf(i); + byte[] b = Bytes.toBytes(s); + assertEquals(s, m.get(b)); + } + + Set set = Sets.newHashSet(); + for (Map.Entry e : m.entryIterable()) { + set.add(e.getValue()); + } + assertEquals(20000, set.size()); + } +} diff --git src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterWriter.java src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterWriter.java index ce04be8..2d42fb8 100644 --- src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterWriter.java +++ src/main/java/org/apache/hadoop/hbase/util/CompoundBloomFilterWriter.java @@ -276,4 +276,9 @@ public class CompoundBloomFilterWriter extends CompoundBloomFilterBase return cacheOnWrite; } + @Override + public void dataBlockWritten(byte[] firstKeyInBlock, + long lastDataBlockOffset, int onDiskSize) { + } + }