diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java index e68e486..eb23881 100644 --- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -1084,6 +1084,9 @@ public class KeyValue implements Writable, HeapSize { public byte [] getFamily() { int o = getFamilyOffset(); int l = getFamilyLength(o); + if(l == 0){ + return new byte[0]; + } byte [] result = new byte[l]; System.arraycopy(this.bytes, o, result, 0, l); return result; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressedKeyValue.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressedKeyValue.java new file mode 100644 index 0000000..5eb7c88 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressedKeyValue.java @@ -0,0 +1,155 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Compression class for Key/Value pairs before writing them to WAL. This is not synchronized, so synchronization should be handled outside. + * + * Class only compresses and decompresses row keys, family names, and the qualifier. More/less may be added depending on use patterns. + */ + +class CompressedKeyValue { + /** + * Decompresses a KeyValue from a DataInput and returns it. + * + * @param in the DataInput + * @param dict the dictionary to use + * @return a decompresssed KeyValue + * @throws IOException + */ + public static KeyValue readFields(DataInput in, WALDictionary dict) throws IOException { + int keylength = in.readInt(); + int vlength = in.readInt(); + int length = KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE + keylength + vlength; + + byte[] backingArray = new byte[length]; + int pos = 0; + pos = Bytes.putInt(backingArray, pos, keylength); + pos = Bytes.putInt(backingArray, pos, vlength); + + //the row + pos = readCompressed(backingArray, in, pos, Bytes.SIZEOF_SHORT, dict); + + //family + pos = readCompressed(backingArray, in, pos, Bytes.SIZEOF_BYTE, dict); + + //qualifier + pos = readCompressed(backingArray, in, pos, 0, dict); + + //the rest + in.readFully(backingArray, pos, length - pos); + + return new KeyValue(backingArray); + + } + + /** + * Compressed and writes ourKV to out, a DataOutput. + * + * @param out the DataOutput + * @param ourKV the KV to compress and write + * @param dict the dictionary we use for compression + * @throws IOException + */ + public static void write(final DataOutput out, KeyValue ourKV, WALDictionary dict) throws IOException { + byte[] backingArray = ourKV.getBuffer(); + int offset = ourKV.getOffset(); + + + //we first write the KeyValue infrastructure bytes, since these are the first. + out.write(backingArray, offset , KeyValue.KEYVALUE_INFRASTRUCTURE_SIZE); + + //now we write the row, since the row is likely to be repeated, this is compressed.. + writeCompressed(ourKV.getRow(), out, dict); + + //now family, if it exists. if it doesn't, we write a 0 length array. + writeCompressed(ourKV.getFamily(), out, dict); + + //qualifier next + writeCompressed(ourKV.getQualifier(), out, dict); + + //now we write the rest uncompressed + int pos = ourKV.getTimestampOffset(); + int remainingLength = ourKV.getLength() - (pos - offset); + out.write(remainingLength); + out.write(backingArray, pos,remainingLength); + } + + /*The sizebytes is the size of the length entry. Oftentimes we do not use an int, because we can represent the entry as a byte or a short*/ + private static int readCompressed(byte[] to, DataInput in, int offset, int sizeBytes, WALDictionary dict) throws IOException{ + int status = in.readUnsignedByte(); + int length; + int dictIdx; + int pos = offset; + byte[] entry = null; + + if(status == 0){ + length = in.readInt(); + } else { + dictIdx = in.readInt(); + entry = dict.getEntry(dictIdx); + length = entry.length; + } + + //we write the length, depending on the amount of bytes. anything except 0, 1 or 2 is treated as an int for now. + if(sizeBytes == Bytes.SIZEOF_BYTE){ + pos = Bytes.putByte(to, pos, (byte)(length & 0x0000ff)); + } else if(sizeBytes == Bytes.SIZEOF_SHORT){ + pos = Bytes.putShort(to, pos, (short)(length & 0x0000ffff)); + } else if(sizeBytes == Bytes.SIZEOF_INT){ + pos = Bytes.putInt(to, pos, length); + } + + //now we write the compressed value. + if(status == 0){ + //if this isn't in the dictionary, we need to add to the dictionary. + byte[] arr = new byte[length]; + in.readFully(arr, 0, length); + Bytes.putBytes(to, pos, arr, 0, length); + } else { + pos = Bytes.putBytes(to, pos, entry, 0, length); + } + + return pos; + } + + private static void writeCompressed(byte[] data, DataOutput out, WALDictionary dict) throws IOException { + int dictIdx = dict.findEntry(data); + if (dictIdx == -1) { + // not in dict + out.writeByte(0); + out.writeInt(data.length); + out.write(data); + dict.addEntry(data); + } else { + out.writeByte(1); + out.write(dictIdx); + } + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SimpleDictionary.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SimpleDictionary.java new file mode 100644 index 0000000..10721bc --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SimpleDictionary.java @@ -0,0 +1,61 @@ +/** + * Copyright 2011 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +/**A Simple Dictionary, one way associative, for a proof of concept*/ + +class SimpleDictionary implements WALDictionary { + final byte[][] lookup; + final int size; + + public SimpleDictionary(int size){ + lookup = new byte[size][]; + this.size = size; + } + + @Override + public byte[] getEntry(int idx) { + return lookup[idx]; + } + + @Override + public int findEntry(byte[] data) { + int hc = data.hashCode() % size; + if(lookup[hc] == null){ + return -1; + } + if(lookup[hc].equals(data)){ + return hc; + } + else{ + return -1; + } + + } + + @Override + public int addEntry(byte[] data) { + int hc = data.hashCode() % size; + lookup[hc] = data; + return hc; + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDictionary.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDictionary.java new file mode 100644 index 0000000..adda6b4 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALDictionary.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +interface WALDictionary { + public byte[] getEntry(int idx); + public int findEntry(byte[] data); + public int addEntry(byte[] data); +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java index e1117ef..d19f6dd 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java @@ -70,12 +70,17 @@ import org.apache.hadoop.io.Writable; public class WALEdit implements Writable, HeapSize { private final int VERSION_2 = -1; + + private WALDictionary compression; private final ArrayList kvs = new ArrayList(); private NavigableMap scopes; public WALEdit() { } + + public WALEdit(final WALDictionary compression){ + } public void add(KeyValue kv) { this.kvs.add(kv); @@ -114,9 +119,13 @@ public class WALEdit implements Writable, HeapSize { // this is new style HLog entry containing multiple KeyValues. int numEdits = in.readInt(); for (int idx = 0; idx < numEdits; idx++) { + if(compression != null){ + this.add(CompressedKeyValue.readFields(in, compression)); + } else { KeyValue kv = new KeyValue(); kv.readFields(in); this.add(kv); + } } int numFamilies = in.readInt(); if (numFamilies > 0) { @@ -131,7 +140,7 @@ public class WALEdit implements Writable, HeapSize { } } else { // this is an old style HLog entry. The int that we just - // read is actually the length of a single KeyValue. + // read is actually the length of a single KeyValue KeyValue kv = new KeyValue(); kv.readFields(versionOrLength, in); this.add(kv); @@ -144,7 +153,11 @@ public class WALEdit implements Writable, HeapSize { out.writeInt(kvs.size()); // We interleave the two lists for code simplicity for (KeyValue kv : kvs) { + if(compression != null){ + CompressedKeyValue.write(out, kv, compression); + } else{ kv.write(out); + } } if (scopes == null) { out.writeInt(0);