diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index e59163d..41e12b0 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -137,6 +137,21 @@ public final class WALProtos { */ com.google.protobuf.ByteString getWriterClsNameBytes(); + + // optional string cell_codec_cls_name = 5; + /** + * optional string cell_codec_cls_name = 5; + */ + boolean hasCellCodecClsName(); + /** + * optional string cell_codec_cls_name = 5; + */ + java.lang.String getCellCodecClsName(); + /** + * optional string cell_codec_cls_name = 5; + */ + com.google.protobuf.ByteString + getCellCodecClsNameBytes(); } /** * Protobuf type {@code WALHeader} @@ -209,6 +224,11 @@ public final class WALProtos { writerClsName_ = input.readBytes(); break; } + case 42: { + bitField0_ |= 0x00000010; + cellCodecClsName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -340,11 +360,55 @@ public final class WALProtos { } } + // optional string cell_codec_cls_name = 5; + public static final int CELL_CODEC_CLS_NAME_FIELD_NUMBER = 5; + private java.lang.Object cellCodecClsName_; + /** + * optional string cell_codec_cls_name = 5; + */ + public boolean hasCellCodecClsName() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional string cell_codec_cls_name = 5; + */ + public java.lang.String getCellCodecClsName() { + java.lang.Object ref = cellCodecClsName_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + cellCodecClsName_ = s; + } + return s; + } + } + /** + * optional string cell_codec_cls_name = 5; + */ + public com.google.protobuf.ByteString + getCellCodecClsNameBytes() { + java.lang.Object ref = cellCodecClsName_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + cellCodecClsName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { hasCompression_ = false; encryptionKey_ = com.google.protobuf.ByteString.EMPTY; hasTagCompression_ = false; writerClsName_ = ""; + cellCodecClsName_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -370,6 +434,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(4, getWriterClsNameBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBytes(5, getCellCodecClsNameBytes()); + } getUnknownFields().writeTo(output); } @@ -395,6 +462,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, getWriterClsNameBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(5, getCellCodecClsNameBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -438,6 +509,11 @@ public final class WALProtos { result = result && getWriterClsName() .equals(other.getWriterClsName()); } + result = result && (hasCellCodecClsName() == other.hasCellCodecClsName()); + if (hasCellCodecClsName()) { + result = result && getCellCodecClsName() + .equals(other.getCellCodecClsName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -467,6 +543,10 @@ public final class WALProtos { hash = (37 * hash) + WRITER_CLS_NAME_FIELD_NUMBER; hash = (53 * hash) + getWriterClsName().hashCode(); } + if (hasCellCodecClsName()) { + hash = (37 * hash) + CELL_CODEC_CLS_NAME_FIELD_NUMBER; + hash = (53 * hash) + getCellCodecClsName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -584,6 +664,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000004); writerClsName_ = ""; bitField0_ = (bitField0_ & ~0x00000008); + cellCodecClsName_ = ""; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -628,6 +710,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000008; } result.writerClsName_ = writerClsName_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.cellCodecClsName_ = cellCodecClsName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -658,6 +744,11 @@ public final class WALProtos { writerClsName_ = other.writerClsName_; onChanged(); } + if (other.hasCellCodecClsName()) { + bitField0_ |= 0x00000010; + cellCodecClsName_ = other.cellCodecClsName_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -861,6 +952,80 @@ public final class WALProtos { return this; } + // optional string cell_codec_cls_name = 5; + private java.lang.Object cellCodecClsName_ = ""; + /** + * optional string cell_codec_cls_name = 5; + */ + public boolean hasCellCodecClsName() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional string cell_codec_cls_name = 5; + */ + public java.lang.String getCellCodecClsName() { + java.lang.Object ref = cellCodecClsName_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + cellCodecClsName_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string cell_codec_cls_name = 5; + */ + public com.google.protobuf.ByteString + getCellCodecClsNameBytes() { + java.lang.Object ref = cellCodecClsName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + cellCodecClsName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string cell_codec_cls_name = 5; + */ + public Builder setCellCodecClsName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + cellCodecClsName_ = value; + onChanged(); + return this; + } + /** + * optional string cell_codec_cls_name = 5; + */ + public Builder clearCellCodecClsName() { + bitField0_ = (bitField0_ & ~0x00000010); + cellCodecClsName_ = getDefaultInstance().getCellCodecClsName(); + onChanged(); + return this; + } + /** + * optional string cell_codec_cls_name = 5; + */ + public Builder setCellCodecClsNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000010; + cellCodecClsName_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALHeader) } @@ -5464,27 +5629,27 @@ public final class WALProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\tWAL.proto\032\013HBase.proto\"r\n\tWALHeader\022\027\n" + - "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" + - "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017w" + - "riter_cls_name\030\004 \001(\t\"\202\002\n\006WALKey\022\033\n\023encod" + - "ed_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014" + - "\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite_t" + - "ime\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001" + - "\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022follow" + - "ing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132" + - "\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(", - "\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscop" + - "e_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024CompactionD" + - "escriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded" + - "_region_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022" + - "\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compaction_" + - "output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013" + - "region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\tScope" + - "Type\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REP" + - "LICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.ha" + - "doop.hbase.protobuf.generatedB\tWALProtos", - "H\001\210\001\000\240\001\001" + "\n\tWAL.proto\032\013HBase.proto\"\217\001\n\tWALHeader\022\027" + + "\n\017has_compression\030\001 \001(\010\022\026\n\016encryption_ke" + + "y\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017" + + "writer_cls_name\030\004 \001(\t\022\033\n\023cell_codec_cls_" + + "name\030\005 \001(\t\"\202\002\n\006WALKey\022\033\n\023encoded_region_" + + "name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023log_se" + + "quence_number\030\003 \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022" + + "\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes" + + "\030\006 \003(\0132\014.FamilyScope\022\032\n\022following_kv_cou" + + "nt\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132\005.UUID\022\022\n\n", + "nonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(\004\"=\n\013Famil" + + "yScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscope_type\030\002 \002" + + "(\0162\n.ScopeType\"\276\001\n\024CompactionDescriptor\022" + + "\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded_region_na" + + "me\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022\030\n\020compact" + + "ion_input\030\004 \003(\t\022\031\n\021compaction_output\030\005 \003" + + "(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013region_nam" + + "e\030\007 \001(\014\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027RE" + + "PLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_S" + + "COPE_GLOBAL\020\001B?\n*org.apache.hadoop.hbase", + ".protobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5496,7 +5661,7 @@ public final class WALProtos { internal_static_WALHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALHeader_descriptor, - new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", }); + new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", "CellCodecClsName", }); internal_static_WALKey_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_WALKey_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index 35708d5..9a108f0 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -28,6 +28,7 @@ message WALHeader { optional bytes encryption_key = 2; optional bool has_tag_compression = 3; optional string writer_cls_name = 4; + optional string cell_codec_cls_name = 5; } // Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 8f0f1c0..e8a6898 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -78,6 +78,24 @@ public class ProtobufLogReader extends ReaderBase { SUCCESS, UNKNOWN_WRITER_CLS // name of writer class isn't recognized } + + // context for WALHdr carrying information such as Cell Codec classname + static class WALHdrContext { + WALHdrResult result; + String cellCodecClsName; + + WALHdrContext(WALHdrResult result, String cellCodecClsName) { + this.result = result; + this.cellCodecClsName = cellCodecClsName; + } + WALHdrResult getResult() { + return result; + } + String getCellCodecClsName() { + return cellCodecClsName; + } + } + public ProtobufLogReader() { super(); } @@ -97,13 +115,13 @@ public class ProtobufLogReader extends ReaderBase { @Override public void reset() throws IOException { - initInternal(null, false); - initAfterCompression(); // We need a new decoder (at least). + String clsName = initInternal(null, false); + initAfterCompression(clsName); // We need a new decoder (at least). } @Override - protected void initReader(FSDataInputStream stream) throws IOException { - initInternal(stream, true); + protected String initReader(FSDataInputStream stream) throws IOException { + return initInternal(stream, true); } /* @@ -113,18 +131,22 @@ public class ProtobufLogReader extends ReaderBase { return writerClsNames; } - protected WALHdrResult readHeader(Builder builder, FSDataInputStream stream) + protected WALHdrContext readHeader(Builder builder, FSDataInputStream stream) throws IOException { boolean res = builder.mergeDelimitedFrom(stream); - if (!res) return WALHdrResult.EOF; + if (!res) return new WALHdrContext(WALHdrResult.EOF, null); if (builder.hasWriterClsName() && !getWriterClsNames().contains(builder.getWriterClsName())) { - return WALHdrResult.UNKNOWN_WRITER_CLS; + return new WALHdrContext(WALHdrResult.UNKNOWN_WRITER_CLS, null); + } + String clsName = null; + if (builder.hasCellCodecClsName()) { + clsName = builder.getCellCodecClsName(); } - return WALHdrResult.SUCCESS; + return new WALHdrContext(WALHdrResult.SUCCESS, clsName); } - private void initInternal(FSDataInputStream stream, boolean isFirst) + private String initInternal(FSDataInputStream stream, boolean isFirst) throws IOException { close(); long expectedPos = PB_WAL_MAGIC.length; @@ -137,7 +159,8 @@ public class ProtobufLogReader extends ReaderBase { } // Initialize metadata or, when we reset, just skip the header. WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder(); - WALHdrResult walHdrRes = readHeader(builder, stream); + WALHdrContext hdrCtxt = readHeader(builder, stream); + WALHdrResult walHdrRes = hdrCtxt.getResult(); if (walHdrRes == WALHdrResult.EOF) { throw new EOFException("Couldn't read WAL PB header"); } @@ -158,6 +181,7 @@ public class ProtobufLogReader extends ReaderBase { LOG.trace("After reading the trailer: walEditsStopOffset: " + this.walEditsStopOffset + ", fileLength: " + this.fileLength + ", " + "trailerPresent: " + trailerPresent); } + return hdrCtxt.getCellCodecClsName(); } /** @@ -213,14 +237,14 @@ public class ProtobufLogReader extends ReaderBase { return false; } - protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) - throws IOException { - return WALCellCodec.create(conf, compressionContext); + protected WALCellCodec getCodec(Configuration conf, String cellCodecClsName, + CompressionContext compressionContext) throws IOException { + return WALCellCodec.create(conf, cellCodecClsName, compressionContext); } @Override - protected void initAfterCompression() throws IOException { - WALCellCodec codec = getCodec(this.conf, this.compressionContext); + protected void initAfterCompression(String cellCodecClsName) throws IOException { + WALCellCodec codec = getCodec(this.conf, cellCodecClsName, this.compressionContext); this.cellDecoder = codec.getDecoder(this.inputStream); if (this.hasCompression) { this.byteStringUncompressor = codec.getByteStringUncompressor(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index ec4cb70..a4c1ade 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -55,13 +55,17 @@ public class ProtobufLogWriter extends WriterBase { protected WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) throws IOException { - return WALCellCodec.create(conf, compressionContext); + return WALCellCodec.create(conf, null, compressionContext); } - protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException { + protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) + throws IOException { if (!builder.hasWriterClsName()) { builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName()); } + if (!builder.hasCellCodecClsName()) { + builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf)); + } return builder.build(); } @@ -82,7 +86,7 @@ public class ProtobufLogWriter extends WriterBase { output.write(ProtobufLogReader.PB_WAL_MAGIC); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); - buildWALHeader( + buildWALHeader(conf, WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)) .writeDelimitedTo(output); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java index faec841..79dca1c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java @@ -66,7 +66,7 @@ public abstract class ReaderBase implements HLog.Reader { this.fileLength = this.fs.getFileStatus(path).getLen(); this.trailerWarnSize = conf.getInt(HLog.WAL_TRAILER_WARN_SIZE, HLog.DEFAULT_WAL_TRAILER_WARN_SIZE); - initReader(stream); + String cellCodecClsName = initReader(stream); boolean compression = hasCompression(); if (compression) { @@ -82,7 +82,7 @@ public abstract class ReaderBase implements HLog.Reader { throw new IOException("Failed to initialize CompressionContext", e); } } - initAfterCompression(); + initAfterCompression(cellCodecClsName); } @Override @@ -135,13 +135,15 @@ public abstract class ReaderBase implements HLog.Reader { /** * Initializes the log reader with a particular stream (may be null). * Reader assumes ownership of the stream if not null and may use it. Called once. + * @return the class name of cell Codec, null if such information is not available */ - protected abstract void initReader(FSDataInputStream stream) throws IOException; + protected abstract String initReader(FSDataInputStream stream) throws IOException; /** * Initializes the compression after the shared stuff has been initialized. Called once. + * @param cellCodecClsName class name of cell Codec */ - protected abstract void initAfterCompression() throws IOException; + protected abstract void initAfterCompression(String cellCodecClsName) throws IOException; /** * @return Whether compression is enabled for this log. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java index 7b025f8..e4cdf14 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.io.crypto.Cipher; import org.apache.hadoop.hbase.io.crypto.Decryptor; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WALHdrResult; import org.apache.hadoop.hbase.security.EncryptionUtil; import org.apache.hadoop.hbase.security.User; @@ -52,9 +53,10 @@ public class SecureProtobufLogReader extends ProtobufLogReader { } @Override - protected WALHdrResult readHeader(WALHeader.Builder builder, FSDataInputStream stream) + protected WALHdrContext readHeader(WALHeader.Builder builder, FSDataInputStream stream) throws IOException { - WALHdrResult result = super.readHeader(builder, stream); + WALHdrContext hdrCtxt = super.readHeader(builder, stream); + WALHdrResult result = hdrCtxt.getResult(); // We need to unconditionally handle the case where the WAL has a key in // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is // no longer set in the site configuration. @@ -121,19 +123,19 @@ public class SecureProtobufLogReader extends ProtobufLogReader { } } - return result; + return hdrCtxt; } @Override - protected void initAfterCompression() throws IOException { - if (decryptor != null) { + protected void initAfterCompression(String cellCodecClsName) throws IOException { + if (decryptor != null && cellCodecClsName.equals(SecureWALCellCodec.class.getName())) { WALCellCodec codec = SecureWALCellCodec.getCodec(this.conf, decryptor); this.cellDecoder = codec.getDecoder(this.inputStream); // We do not support compression with WAL encryption this.compressionContext = null; this.hasCompression = false; } else { - super.initAfterCompression(); + super.initAfterCompression(cellCodecClsName); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java index fa95388..0bffa45 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java @@ -24,6 +24,7 @@ import java.security.SecureRandom; import javax.crypto.spec.SecretKeySpec; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,7 +44,8 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter { private Encryptor encryptor = null; @Override - protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException { + protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) + throws IOException { builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName()); if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { // Get an instance of our cipher @@ -72,8 +74,8 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter { LOG.trace("Initialized secure protobuf WAL: cipher=" + cipher.getName()); } } - - return super.buildWALHeader(builder); + builder.setCellCodecClsName(SecureWALCellCodec.class.getName()); + return super.buildWALHeader(conf, builder); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java index 9fa6f98..66e0c17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogReader.java @@ -175,16 +175,17 @@ public class SequenceFileLogReader extends ReaderBase { } @Override - protected void initReader(FSDataInputStream stream) throws IOException { + protected String initReader(FSDataInputStream stream) throws IOException { // We don't use the stream because we have to have the magic stream above. if (stream != null) { stream.close(); } reset(); + return null; } @Override - protected void initAfterCompression() throws IOException { + protected void initAfterCompression(String cellCodecClsName) throws IOException { // Nothing to do here } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java index 70dc575..93bc1cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java @@ -77,20 +77,29 @@ public class WALCellCodec implements Codec { this.compression = compression; } + static String getWALCellCodecClass(Configuration conf) { + return conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + } + /** - * Create and setup a {@link WALCellCodec} from the {@link Configuration} and CompressionContext, - * if they have been specified. Fully prepares the codec for use. + * Create and setup a {@link WALCellCodec} from the {@link cellCodecClsName} and + * CompressionContext, if {@link cellCodecClsName} is specified. + * Otherwise Cell Codec classname is read from {@link Configuration}. + * Fully prepares the codec for use. * @param conf {@link Configuration} to read for the user-specified codec. If none is specified, * uses a {@link WALCellCodec}. * @param compression compression the codec should use * @return a {@link WALCellCodec} ready for use. * @throws UnsupportedOperationException if the codec cannot be instantiated */ - public static WALCellCodec create(Configuration conf, CompressionContext compression) - throws UnsupportedOperationException { - String className = conf.get(WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); - return ReflectionUtils.instantiateWithCustomCtor(className, new Class[] { Configuration.class, - CompressionContext.class }, new Object[] { conf, compression }); + + public static WALCellCodec create(Configuration conf, String cellCodecClsName, + CompressionContext compression) throws UnsupportedOperationException { + if (cellCodecClsName == null) { + cellCodecClsName = getWALCellCodecClass(conf); + } + return ReflectionUtils.instantiateWithCustomCtor(cellCodecClsName, new Class[] + { Configuration.class, CompressionContext.class }, new Object[] { conf, compression }); } public interface ByteStringCompressor { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java index b992aca..cbe1a06 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestCustomWALCellCodec.java @@ -53,7 +53,7 @@ public class TestCustomWALCellCodec { Configuration conf = new Configuration(false); conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class, WALCellCodec.class); - CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null); + CustomWALCellCodec codec = (CustomWALCellCodec) WALCellCodec.create(conf, null, null); assertEquals("Custom codec didn't get initialized with the right configuration!", conf, codec.conf); assertEquals("Custom codec didn't get initialized with the right compression context!", null, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java index 1ad88b1..b09d361 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.regionserver.wal.TestCustomWALCellCodec.CustomWALCellCodec; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; @@ -74,6 +75,9 @@ public class TestHLogReaderOnSecureHLog { private Path writeWAL(String tblName) throws IOException { Configuration conf = TEST_UTIL.getConfiguration(); + String clsName = conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + conf.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, CustomWALCellCodec.class, + WALCellCodec.class); TableName tableName = TableName.valueOf(tblName); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor(tableName.getName())); @@ -95,6 +99,8 @@ public class TestHLogReaderOnSecureHLog { } final Path walPath = ((FSHLog) wal).computeFilename(); wal.close(); + // restore the cell codec class + conf.set(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, clsName); return walPath; }