Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
===================================================================
--- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (revision 1561589)
+++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (working copy)
@@ -112,6 +112,16 @@
* optional bytes encryption_key = 2;
*/
com.google.protobuf.ByteString getEncryptionKey();
+
+ // optional bool has_tag_compression = 3;
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ boolean hasHasTagCompression();
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ boolean getHasTagCompression();
}
/**
* Protobuf type {@code WALHeader}
@@ -174,6 +184,11 @@
encryptionKey_ = input.readBytes();
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ hasTagCompression_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -246,9 +261,26 @@
return encryptionKey_;
}
+ // optional bool has_tag_compression = 3;
+ public static final int HAS_TAG_COMPRESSION_FIELD_NUMBER = 3;
+ private boolean hasTagCompression_;
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ public boolean hasHasTagCompression() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ public boolean getHasTagCompression() {
+ return hasTagCompression_;
+ }
+
private void initFields() {
hasCompression_ = false;
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
+ hasTagCompression_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -268,6 +300,9 @@
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeBytes(2, encryptionKey_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ output.writeBool(3, hasTagCompression_);
+ }
getUnknownFields().writeTo(output);
}
@@ -285,6 +320,10 @@
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, encryptionKey_);
}
+ if (((bitField0_ & 0x00000004) == 0x00000004)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(3, hasTagCompression_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -318,6 +357,11 @@
result = result && getEncryptionKey()
.equals(other.getEncryptionKey());
}
+ result = result && (hasHasTagCompression() == other.hasHasTagCompression());
+ if (hasHasTagCompression()) {
+ result = result && (getHasTagCompression()
+ == other.getHasTagCompression());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -339,6 +383,10 @@
hash = (37 * hash) + ENCRYPTION_KEY_FIELD_NUMBER;
hash = (53 * hash) + getEncryptionKey().hashCode();
}
+ if (hasHasTagCompression()) {
+ hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getHasTagCompression());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -452,6 +500,8 @@
bitField0_ = (bitField0_ & ~0x00000001);
encryptionKey_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000002);
+ hasTagCompression_ = false;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -488,6 +538,10 @@
to_bitField0_ |= 0x00000002;
}
result.encryptionKey_ = encryptionKey_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000004;
+ }
+ result.hasTagCompression_ = hasTagCompression_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -510,6 +564,9 @@
if (other.hasEncryptionKey()) {
setEncryptionKey(other.getEncryptionKey());
}
+ if (other.hasHasTagCompression()) {
+ setHasTagCompression(other.getHasTagCompression());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -606,6 +663,39 @@
return this;
}
+ // optional bool has_tag_compression = 3;
+ private boolean hasTagCompression_ ;
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ public boolean hasHasTagCompression() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ public boolean getHasTagCompression() {
+ return hasTagCompression_;
+ }
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ public Builder setHasTagCompression(boolean value) {
+ bitField0_ |= 0x00000004;
+ hasTagCompression_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bool has_tag_compression = 3;
+ */
+ public Builder clearHasTagCompression() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ hasTagCompression_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:WALHeader)
}
@@ -5084,25 +5174,26 @@
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n\tWAL.proto\032\013HBase.proto\"<\n\tWALHeader\022\027\n" +
+ "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" +
"\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" +
- "\030\002 \001(\014\"\202\002\n\006WALKey\022\033\n\023encoded_region_name" +
- "\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023log_sequen" +
- "ce_number\030\003 \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\nc" +
- "luster_id\030\005 \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003" +
- "(\0132\014.FamilyScope\022\032\n\022following_kv_count\030\007" +
- " \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonc" +
- "eGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilySco" +
- "pe\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n",
- ".ScopeType\"\251\001\n\024CompactionDescriptor\022\022\n\nt" +
- "able_name\030\001 \002(\014\022\033\n\023encoded_region_name\030\002" +
- " \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compaction_" +
- "input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003(\t\022\026" +
- "\n\016store_home_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\t" +
- "ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" +
- "\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" +
- "he.hadoop.hbase.protobuf.generatedB\tWALP" +
- "rotosH\001\210\001\000\240\001\001"
+ "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" +
+ "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" +
+ "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" +
+ " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" +
+ " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" +
+ "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" +
+ "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" +
+ "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil",
+ "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" +
+ "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" +
+ " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" +
+ "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" +
+ "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" +
+ "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" +
+ "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" +
+ "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" +
+ "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" +
+ "\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5114,7 +5205,7 @@
internal_static_WALHeader_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_WALHeader_descriptor,
- new java.lang.String[] { "HasCompression", "EncryptionKey", });
+ new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", });
internal_static_WALKey_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_WALKey_fieldAccessorTable = new
Index: hbase-protocol/src/main/protobuf/WAL.proto
===================================================================
--- hbase-protocol/src/main/protobuf/WAL.proto (revision 1561589)
+++ hbase-protocol/src/main/protobuf/WAL.proto (working copy)
@@ -26,6 +26,7 @@
message WALHeader {
optional bool has_compression = 1;
optional bytes encryption_key = 2;
+ optional bool has_tag_compression = 3;
}
// Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (revision 1561589)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (working copy)
@@ -22,7 +22,6 @@
import java.lang.reflect.InvocationTargetException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.io.TagCompressionContext;
import org.apache.hadoop.hbase.io.util.Dictionary;
@@ -44,8 +43,8 @@
TagCompressionContext tagCompressionContext = null;
public CompressionContext(Class extends Dictionary> dictType, boolean recoveredEdits,
- Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException,
- IllegalAccessException, InvocationTargetException {
+ boolean hasTagCompression) throws SecurityException, NoSuchMethodException,
+ InstantiationException, IllegalAccessException, InvocationTargetException {
Constructor extends Dictionary> dictConstructor =
dictType.getConstructor();
regionDict = dictConstructor.newInstance();
@@ -64,7 +63,7 @@
rowDict.init(Short.MAX_VALUE);
familyDict.init(Byte.MAX_VALUE);
qualifierDict.init(Byte.MAX_VALUE);
- if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) {
+ if (hasTagCompression) {
tagCompressionContext = new TagCompressionContext(dictType);
}
}
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (revision 1561589)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (working copy)
@@ -61,6 +61,7 @@
protected Codec.Decoder cellDecoder;
protected WALCellCodec.ByteStringUncompressor byteStringUncompressor;
protected boolean hasCompression = false;
+ protected boolean hasTagCompression = false;
// walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry
// in the hlog, the inputstream's position is equal to walEditsStopOffset.
private long walEditsStopOffset;
@@ -117,6 +118,7 @@
if (isFirst) {
WALProtos.WALHeader header = builder.build();
this.hasCompression = header.hasHasCompression() && header.getHasCompression();
+ this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression();
}
this.inputStream = stream;
this.walEditsStopOffset = this.fileLength;
@@ -202,6 +204,11 @@
}
@Override
+ protected boolean hasTagCompression() {
+ return this.hasTagCompression;
+ }
+
+ @Override
protected boolean readNext(HLog.Entry entry) throws IOException {
while (true) {
// OriginalPosition might be < 0 on local fs; if so, it is useless to us.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (revision 1561589)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (working copy)
@@ -78,7 +78,11 @@
FSUtils.getDefaultBlockSize(fs, path));
output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null);
output.write(ProtobufLogReader.PB_WAL_MAGIC);
- buildWALHeader(WALHeader.newBuilder().setHasCompression(doCompress)).writeDelimitedTo(output);
+ boolean doTagCompress = doCompress
+ && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true);
+ buildWALHeader(
+ WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress))
+ .writeDelimitedTo(output);
initAfterHeader(doCompress);
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (revision 1561589)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (working copy)
@@ -74,7 +74,7 @@
try {
if (compressionContext == null) {
compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path), conf);
+ FSUtils.isRecoveredEdits(path), hasTagCompression());
} else {
compressionContext.clear();
}
@@ -148,6 +148,11 @@
protected abstract boolean hasCompression();
/**
+ * @return Whether tag compression is enabled for this log.
+ */
+ protected abstract boolean hasTagCompression();
+
+ /**
* Read next entry.
* @param e The entry to read into.
* @return Whether there was anything to read.
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (revision 1561589)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (working copy)
@@ -193,6 +193,12 @@
return isWALCompressionEnabled(reader.getMetadata());
}
+ @Override
+ protected boolean hasTagCompression() {
+ // Tag compression not supported with old SequenceFileLog Reader/Writer
+ return false;
+ }
+
/**
* Call this method after init() has been executed
* @return whether WAL compression is enabled
Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java
===================================================================
--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (revision 1561589)
+++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (working copy)
@@ -48,7 +48,8 @@
if (doCompress) {
try {
this.compressionContext = new CompressionContext(LRUDictionary.class,
- FSUtils.isRecoveredEdits(path), conf);
+ FSUtils.isRecoveredEdits(path), conf.getBoolean(
+ CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true));
} catch (Exception e) {
throw new IOException("Failed to initiate CompressionContext", e);
}
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (revision 1561589)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (working copy)
@@ -68,7 +68,7 @@
}
private void runTestCycle(List kvs) throws Exception {
- CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
+ CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
for (KeyValue kv : kvs) {
KeyValueCompression.writeKV(buf, kv, ctx);
@@ -85,7 +85,7 @@
@Test
public void testKVWithTags() throws Exception {
- CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null);
+ CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false);
DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE);
KeyValueCompression.writeKV(buf, createKV(1), ctx);
KeyValueCompression.writeKV(buf, createKV(0), ctx);
Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java
===================================================================
--- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java (revision 1561589)
+++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java (working copy)
@@ -55,7 +55,7 @@
Configuration conf = new Configuration(false);
conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags);
WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false,
- conf));
+ compressTags));
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
Encoder encoder = codec.getEncoder(bos);
encoder.write(createKV(1));