Index: hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java =================================================================== --- hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (revision 1561589) +++ hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java (working copy) @@ -112,6 +112,16 @@ * optional bytes encryption_key = 2; */ com.google.protobuf.ByteString getEncryptionKey(); + + // optional bool has_tag_compression = 3; + /** + * optional bool has_tag_compression = 3; + */ + boolean hasHasTagCompression(); + /** + * optional bool has_tag_compression = 3; + */ + boolean getHasTagCompression(); } /** * Protobuf type {@code WALHeader} @@ -174,6 +184,11 @@ encryptionKey_ = input.readBytes(); break; } + case 24: { + bitField0_ |= 0x00000004; + hasTagCompression_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -246,9 +261,26 @@ return encryptionKey_; } + // optional bool has_tag_compression = 3; + public static final int HAS_TAG_COMPRESSION_FIELD_NUMBER = 3; + private boolean hasTagCompression_; + /** + * optional bool has_tag_compression = 3; + */ + public boolean hasHasTagCompression() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool has_tag_compression = 3; + */ + public boolean getHasTagCompression() { + return hasTagCompression_; + } + private void initFields() { hasCompression_ = false; encryptionKey_ = com.google.protobuf.ByteString.EMPTY; + hasTagCompression_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -268,6 +300,9 @@ if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, encryptionKey_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(3, hasTagCompression_); + } getUnknownFields().writeTo(output); } @@ -285,6 +320,10 @@ size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, encryptionKey_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(3, hasTagCompression_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -318,6 +357,11 @@ result = result && getEncryptionKey() .equals(other.getEncryptionKey()); } + result = result && (hasHasTagCompression() == other.hasHasTagCompression()); + if (hasHasTagCompression()) { + result = result && (getHasTagCompression() + == other.getHasTagCompression()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -339,6 +383,10 @@ hash = (37 * hash) + ENCRYPTION_KEY_FIELD_NUMBER; hash = (53 * hash) + getEncryptionKey().hashCode(); } + if (hasHasTagCompression()) { + hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getHasTagCompression()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -452,6 +500,8 @@ bitField0_ = (bitField0_ & ~0x00000001); encryptionKey_ = com.google.protobuf.ByteString.EMPTY; bitField0_ = (bitField0_ & ~0x00000002); + hasTagCompression_ = false; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -488,6 +538,10 @@ to_bitField0_ |= 0x00000002; } result.encryptionKey_ = encryptionKey_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.hasTagCompression_ = hasTagCompression_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -510,6 +564,9 @@ if (other.hasEncryptionKey()) { setEncryptionKey(other.getEncryptionKey()); } + if (other.hasHasTagCompression()) { + setHasTagCompression(other.getHasTagCompression()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -606,6 +663,39 @@ return this; } + // optional bool has_tag_compression = 3; + private boolean hasTagCompression_ ; + /** + * optional bool has_tag_compression = 3; + */ + public boolean hasHasTagCompression() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional bool has_tag_compression = 3; + */ + public boolean getHasTagCompression() { + return hasTagCompression_; + } + /** + * optional bool has_tag_compression = 3; + */ + public Builder setHasTagCompression(boolean value) { + bitField0_ |= 0x00000004; + hasTagCompression_ = value; + onChanged(); + return this; + } + /** + * optional bool has_tag_compression = 3; + */ + public Builder clearHasTagCompression() { + bitField0_ = (bitField0_ & ~0x00000004); + hasTagCompression_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALHeader) } @@ -5084,25 +5174,26 @@ descriptor; static { java.lang.String[] descriptorData = { - "\n\tWAL.proto\032\013HBase.proto\"<\n\tWALHeader\022\027\n" + + "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" + "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" + - "\030\002 \001(\014\"\202\002\n\006WALKey\022\033\n\023encoded_region_name" + - "\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023log_sequen" + - "ce_number\030\003 \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\nc" + - "luster_id\030\005 \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003" + - "(\0132\014.FamilyScope\022\032\n\022following_kv_count\030\007" + - " \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonc" + - "eGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilySco" + - "pe\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n", - ".ScopeType\"\251\001\n\024CompactionDescriptor\022\022\n\nt" + - "able_name\030\001 \002(\014\022\033\n\023encoded_region_name\030\002" + - " \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compaction_" + - "input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003(\t\022\026" + - "\n\016store_home_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\t" + - "ScopeType\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034" + - "\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*org.apac" + - "he.hadoop.hbase.protobuf.generatedB\tWALP" + - "rotosH\001\210\001\000\240\001\001" + "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" + + "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" + + "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" + + " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" + + " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" + + "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" + + "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + + "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil", + "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" + + "\251\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + + " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" + + "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" + + "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" + + "e_dir\030\006 \002(\t\"\014\n\nWALTrailer*F\n\tScopeType\022\033" + + "\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATI" + + "ON_SCOPE_GLOBAL\020\001B?\n*org.apache.hadoop.h" + + "base.protobuf.generatedB\tWALProtosH\001\210\001\000\240" + + "\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5114,7 +5205,7 @@ internal_static_WALHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALHeader_descriptor, - new java.lang.String[] { "HasCompression", "EncryptionKey", }); + new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", }); internal_static_WALKey_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_WALKey_fieldAccessorTable = new Index: hbase-protocol/src/main/protobuf/WAL.proto =================================================================== --- hbase-protocol/src/main/protobuf/WAL.proto (revision 1561589) +++ hbase-protocol/src/main/protobuf/WAL.proto (working copy) @@ -26,6 +26,7 @@ message WALHeader { optional bool has_compression = 1; optional bytes encryption_key = 2; + optional bool has_tag_compression = 3; } // Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (revision 1561589) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java (working copy) @@ -22,7 +22,6 @@ import java.lang.reflect.InvocationTargetException; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.io.TagCompressionContext; import org.apache.hadoop.hbase.io.util.Dictionary; @@ -44,8 +43,8 @@ TagCompressionContext tagCompressionContext = null; public CompressionContext(Class dictType, boolean recoveredEdits, - Configuration conf) throws SecurityException, NoSuchMethodException, InstantiationException, - IllegalAccessException, InvocationTargetException { + boolean hasTagCompression) throws SecurityException, NoSuchMethodException, + InstantiationException, IllegalAccessException, InvocationTargetException { Constructor dictConstructor = dictType.getConstructor(); regionDict = dictConstructor.newInstance(); @@ -64,7 +63,7 @@ rowDict.init(Short.MAX_VALUE); familyDict.init(Byte.MAX_VALUE); qualifierDict.init(Byte.MAX_VALUE); - if (conf != null && conf.getBoolean(ENABLE_WAL_TAGS_COMPRESSION, true)) { + if (hasTagCompression) { tagCompressionContext = new TagCompressionContext(dictType); } } Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (revision 1561589) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java (working copy) @@ -61,6 +61,7 @@ protected Codec.Decoder cellDecoder; protected WALCellCodec.ByteStringUncompressor byteStringUncompressor; protected boolean hasCompression = false; + protected boolean hasTagCompression = false; // walEditsStopOffset is the position of the last byte to read. After reading the last WALEdit entry // in the hlog, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; @@ -117,6 +118,7 @@ if (isFirst) { WALProtos.WALHeader header = builder.build(); this.hasCompression = header.hasHasCompression() && header.getHasCompression(); + this.hasTagCompression = header.hasHasTagCompression() && header.getHasTagCompression(); } this.inputStream = stream; this.walEditsStopOffset = this.fileLength; @@ -202,6 +204,11 @@ } @Override + protected boolean hasTagCompression() { + return this.hasTagCompression; + } + + @Override protected boolean readNext(HLog.Entry entry) throws IOException { while (true) { // OriginalPosition might be < 0 on local fs; if so, it is useless to us. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (revision 1561589) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java (working copy) @@ -78,7 +78,11 @@ FSUtils.getDefaultBlockSize(fs, path)); output = fs.createNonRecursive(path, overwritable, bufferSize, replication, blockSize, null); output.write(ProtobufLogReader.PB_WAL_MAGIC); - buildWALHeader(WALHeader.newBuilder().setHasCompression(doCompress)).writeDelimitedTo(output); + boolean doTagCompress = doCompress + && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + buildWALHeader( + WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)) + .writeDelimitedTo(output); initAfterHeader(doCompress); Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (revision 1561589) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java (working copy) @@ -74,7 +74,7 @@ try { if (compressionContext == null) { compressionContext = new CompressionContext(LRUDictionary.class, - FSUtils.isRecoveredEdits(path), conf); + FSUtils.isRecoveredEdits(path), hasTagCompression()); } else { compressionContext.clear(); } @@ -148,6 +148,11 @@ protected abstract boolean hasCompression(); /** + * @return Whether tag compression is enabled for this log. + */ + protected abstract boolean hasTagCompression(); + + /** * Read next entry. * @param e The entry to read into. * @return Whether there was anything to read. Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (revision 1561589) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java (working copy) @@ -193,6 +193,12 @@ return isWALCompressionEnabled(reader.getMetadata()); } + @Override + protected boolean hasTagCompression() { + // Tag compression not supported with old SequenceFileLog Reader/Writer + return false; + } + /** * Call this method after init() has been executed * @return whether WAL compression is enabled Index: hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (revision 1561589) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WriterBase.java (working copy) @@ -48,7 +48,8 @@ if (doCompress) { try { this.compressionContext = new CompressionContext(LRUDictionary.class, - FSUtils.isRecoveredEdits(path), conf); + FSUtils.isRecoveredEdits(path), conf.getBoolean( + CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); } catch (Exception e) { throw new IOException("Failed to initiate CompressionContext", e); } Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (revision 1561589) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestKeyValueCompression.java (working copy) @@ -68,7 +68,7 @@ } private void runTestCycle(List kvs) throws Exception { - CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null); + CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false); DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); for (KeyValue kv : kvs) { KeyValueCompression.writeKV(buf, kv, ctx); @@ -85,7 +85,7 @@ @Test public void testKVWithTags() throws Exception { - CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, null); + CompressionContext ctx = new CompressionContext(LRUDictionary.class, false, false); DataOutputBuffer buf = new DataOutputBuffer(BUF_SIZE); KeyValueCompression.writeKV(buf, createKV(1), ctx); KeyValueCompression.writeKV(buf, createKV(0), ctx); Index: hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java =================================================================== --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java (revision 1561589) +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALCellCodecWithCompression.java (working copy) @@ -55,7 +55,7 @@ Configuration conf = new Configuration(false); conf.setBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, compressTags); WALCellCodec codec = new WALCellCodec(conf, new CompressionContext(LRUDictionary.class, false, - conf)); + compressTags)); ByteArrayOutputStream bos = new ByteArrayOutputStream(1024); Encoder encoder = codec.getEncoder(bos); encoder.write(createKV(1));