diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java index 57a9aa9..e59163d 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java @@ -122,6 +122,21 @@ public final class WALProtos { * optional bool has_tag_compression = 3; */ boolean getHasTagCompression(); + + // optional string writer_cls_name = 4; + /** + * optional string writer_cls_name = 4; + */ + boolean hasWriterClsName(); + /** + * optional string writer_cls_name = 4; + */ + java.lang.String getWriterClsName(); + /** + * optional string writer_cls_name = 4; + */ + com.google.protobuf.ByteString + getWriterClsNameBytes(); } /** * Protobuf type {@code WALHeader} @@ -189,6 +204,11 @@ public final class WALProtos { hasTagCompression_ = input.readBool(); break; } + case 34: { + bitField0_ |= 0x00000008; + writerClsName_ = input.readBytes(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -277,10 +297,54 @@ public final class WALProtos { return hasTagCompression_; } + // optional string writer_cls_name = 4; + public static final int WRITER_CLS_NAME_FIELD_NUMBER = 4; + private java.lang.Object writerClsName_; + /** + * optional string writer_cls_name = 4; + */ + public boolean hasWriterClsName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional string writer_cls_name = 4; + */ + public java.lang.String getWriterClsName() { + java.lang.Object ref = writerClsName_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + writerClsName_ = s; + } + return s; + } + } + /** + * optional string writer_cls_name = 4; + */ + public com.google.protobuf.ByteString + getWriterClsNameBytes() { + java.lang.Object ref = writerClsName_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + writerClsName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + private void initFields() { hasCompression_ = false; encryptionKey_ = com.google.protobuf.ByteString.EMPTY; hasTagCompression_ = false; + writerClsName_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -303,6 +367,9 @@ public final class WALProtos { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(3, hasTagCompression_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBytes(4, getWriterClsNameBytes()); + } getUnknownFields().writeTo(output); } @@ -324,6 +391,10 @@ public final class WALProtos { size += com.google.protobuf.CodedOutputStream .computeBoolSize(3, hasTagCompression_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(4, getWriterClsNameBytes()); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -362,6 +433,11 @@ public final class WALProtos { result = result && (getHasTagCompression() == other.getHasTagCompression()); } + result = result && (hasWriterClsName() == other.hasWriterClsName()); + if (hasWriterClsName()) { + result = result && getWriterClsName() + .equals(other.getWriterClsName()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -387,6 +463,10 @@ public final class WALProtos { hash = (37 * hash) + HAS_TAG_COMPRESSION_FIELD_NUMBER; hash = (53 * hash) + hashBoolean(getHasTagCompression()); } + if (hasWriterClsName()) { + hash = (37 * hash) + WRITER_CLS_NAME_FIELD_NUMBER; + hash = (53 * hash) + getWriterClsName().hashCode(); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -502,6 +582,8 @@ public final class WALProtos { bitField0_ = (bitField0_ & ~0x00000002); hasTagCompression_ = false; bitField0_ = (bitField0_ & ~0x00000004); + writerClsName_ = ""; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -542,6 +624,10 @@ public final class WALProtos { to_bitField0_ |= 0x00000004; } result.hasTagCompression_ = hasTagCompression_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000008; + } + result.writerClsName_ = writerClsName_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -567,6 +653,11 @@ public final class WALProtos { if (other.hasHasTagCompression()) { setHasTagCompression(other.getHasTagCompression()); } + if (other.hasWriterClsName()) { + bitField0_ |= 0x00000008; + writerClsName_ = other.writerClsName_; + onChanged(); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -696,6 +787,80 @@ public final class WALProtos { return this; } + // optional string writer_cls_name = 4; + private java.lang.Object writerClsName_ = ""; + /** + * optional string writer_cls_name = 4; + */ + public boolean hasWriterClsName() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * optional string writer_cls_name = 4; + */ + public java.lang.String getWriterClsName() { + java.lang.Object ref = writerClsName_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + writerClsName_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string writer_cls_name = 4; + */ + public com.google.protobuf.ByteString + getWriterClsNameBytes() { + java.lang.Object ref = writerClsName_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + writerClsName_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * optional string writer_cls_name = 4; + */ + public Builder setWriterClsName( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + writerClsName_ = value; + onChanged(); + return this; + } + /** + * optional string writer_cls_name = 4; + */ + public Builder clearWriterClsName() { + bitField0_ = (bitField0_ & ~0x00000008); + writerClsName_ = getDefaultInstance().getWriterClsName(); + onChanged(); + return this; + } + /** + * optional string writer_cls_name = 4; + */ + public Builder setWriterClsNameBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000008; + writerClsName_ = value; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:WALHeader) } @@ -5299,26 +5464,27 @@ public final class WALProtos { descriptor; static { java.lang.String[] descriptorData = { - "\n\tWAL.proto\032\013HBase.proto\"Y\n\tWALHeader\022\027\n" + + "\n\tWAL.proto\032\013HBase.proto\"r\n\tWALHeader\022\027\n" + "\017has_compression\030\001 \001(\010\022\026\n\016encryption_key" + - "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\"\202\002\n\006" + - "WALKey\022\033\n\023encoded_region_name\030\001 \002(\014\022\022\n\nt" + - "able_name\030\002 \002(\014\022\033\n\023log_sequence_number\030\003" + - " \002(\004\022\022\n\nwrite_time\030\004 \002(\004\022\035\n\ncluster_id\030\005" + - " \001(\0132\005.UUIDB\002\030\001\022\034\n\006scopes\030\006 \003(\0132\014.Family" + - "Scope\022\032\n\022following_kv_count\030\007 \001(\r\022\032\n\013clu" + - "ster_ids\030\010 \003(\0132\005.UUID\022\022\n\nnonceGroup\030\t \001(" + - "\004\022\r\n\005nonce\030\n \001(\004\"=\n\013FamilyScope\022\016\n\006famil", - "y\030\001 \002(\014\022\036\n\nscope_type\030\002 \002(\0162\n.ScopeType\"" + - "\276\001\n\024CompactionDescriptor\022\022\n\ntable_name\030\001" + - " \002(\014\022\033\n\023encoded_region_name\030\002 \002(\014\022\023\n\013fam" + - "ily_name\030\003 \002(\014\022\030\n\020compaction_input\030\004 \003(\t" + - "\022\031\n\021compaction_output\030\005 \003(\t\022\026\n\016store_hom" + - "e_dir\030\006 \002(\t\022\023\n\013region_name\030\007 \001(\014\"\014\n\nWALT" + - "railer*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE" + - "_LOCAL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?" + - "\n*org.apache.hadoop.hbase.protobuf.gener" + - "atedB\tWALProtosH\001\210\001\000\240\001\001" + "\030\002 \001(\014\022\033\n\023has_tag_compression\030\003 \001(\010\022\027\n\017w" + + "riter_cls_name\030\004 \001(\t\"\202\002\n\006WALKey\022\033\n\023encod" + + "ed_region_name\030\001 \002(\014\022\022\n\ntable_name\030\002 \002(\014" + + "\022\033\n\023log_sequence_number\030\003 \002(\004\022\022\n\nwrite_t" + + "ime\030\004 \002(\004\022\035\n\ncluster_id\030\005 \001(\0132\005.UUIDB\002\030\001" + + "\022\034\n\006scopes\030\006 \003(\0132\014.FamilyScope\022\032\n\022follow" + + "ing_kv_count\030\007 \001(\r\022\032\n\013cluster_ids\030\010 \003(\0132" + + "\005.UUID\022\022\n\nnonceGroup\030\t \001(\004\022\r\n\005nonce\030\n \001(", + "\004\"=\n\013FamilyScope\022\016\n\006family\030\001 \002(\014\022\036\n\nscop" + + "e_type\030\002 \002(\0162\n.ScopeType\"\276\001\n\024CompactionD" + + "escriptor\022\022\n\ntable_name\030\001 \002(\014\022\033\n\023encoded" + + "_region_name\030\002 \002(\014\022\023\n\013family_name\030\003 \002(\014\022" + + "\030\n\020compaction_input\030\004 \003(\t\022\031\n\021compaction_" + + "output\030\005 \003(\t\022\026\n\016store_home_dir\030\006 \002(\t\022\023\n\013" + + "region_name\030\007 \001(\014\"\014\n\nWALTrailer*F\n\tScope" + + "Type\022\033\n\027REPLICATION_SCOPE_LOCAL\020\000\022\034\n\030REP" + + "LICATION_SCOPE_GLOBAL\020\001B?\n*org.apache.ha" + + "doop.hbase.protobuf.generatedB\tWALProtos", + "H\001\210\001\000\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5330,7 +5496,7 @@ public final class WALProtos { internal_static_WALHeader_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_WALHeader_descriptor, - new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", }); + new java.lang.String[] { "HasCompression", "EncryptionKey", "HasTagCompression", "WriterClsName", }); internal_static_WALKey_descriptor = getDescriptor().getMessageTypes().get(1); internal_static_WALKey_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto index ae3636e..35708d5 100644 --- a/hbase-protocol/src/main/protobuf/WAL.proto +++ b/hbase-protocol/src/main/protobuf/WAL.proto @@ -27,6 +27,7 @@ message WALHeader { optional bool has_compression = 1; optional bytes encryption_key = 2; optional bool has_tag_compression = 3; + optional string writer_cls_name = 4; } // Protocol buffer version of HLogKey; see HLogKey comment, not really a key but WALEdit header for some KVs diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index 2b9130c..98dc432 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -167,6 +167,10 @@ public class HLogFactory { */ private static Class logWriterClass; + static void resetLogWriterClass() { + logWriterClass = null; + } + /** * Create a writer for the WAL. * @return A WAL writer. Close when done with it. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java index 11ef770..8f0f1c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java @@ -23,7 +23,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -66,7 +68,16 @@ public class ProtobufLogReader extends ReaderBase { // in the hlog, the inputstream's position is equal to walEditsStopOffset. private long walEditsStopOffset; private boolean trailerPresent; + private static List writerClsNames = new ArrayList(); + static { + writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); + } + enum WALHdrResult { + EOF, // stream is at EOF when method starts + SUCCESS, + UNKNOWN_WRITER_CLS // name of writer class isn't recognized + } public ProtobufLogReader() { super(); } @@ -95,11 +106,26 @@ public class ProtobufLogReader extends ReaderBase { initInternal(stream, true); } - protected boolean readHeader(Builder builder, FSDataInputStream stream) throws IOException { - return builder.mergeDelimitedFrom(stream); + /* + * Returns names of the accepted writer classes + */ + protected List getWriterClsNames() { + return writerClsNames; + } + + protected WALHdrResult readHeader(Builder builder, FSDataInputStream stream) + throws IOException { + boolean res = builder.mergeDelimitedFrom(stream); + if (!res) return WALHdrResult.EOF; + if (builder.hasWriterClsName() && + !getWriterClsNames().contains(builder.getWriterClsName())) { + return WALHdrResult.UNKNOWN_WRITER_CLS; + } + return WALHdrResult.SUCCESS; } - private void initInternal(FSDataInputStream stream, boolean isFirst) throws IOException { + private void initInternal(FSDataInputStream stream, boolean isFirst) + throws IOException { close(); long expectedPos = PB_WAL_MAGIC.length; if (stream == null) { @@ -111,10 +137,13 @@ public class ProtobufLogReader extends ReaderBase { } // Initialize metadata or, when we reset, just skip the header. WALProtos.WALHeader.Builder builder = WALProtos.WALHeader.newBuilder(); - boolean hasHeader = readHeader(builder, stream); - if (!hasHeader) { + WALHdrResult walHdrRes = readHeader(builder, stream); + if (walHdrRes == WALHdrResult.EOF) { throw new EOFException("Couldn't read WAL PB header"); } + if (walHdrRes == WALHdrResult.UNKNOWN_WRITER_CLS) { + throw new IOException("Got unknown writer class: " + builder.getWriterClsName()); + } if (isFirst) { WALProtos.WALHeader header = builder.build(); this.hasCompression = header.hasHasCompression() && header.getHasCompression(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index c8369df..ec4cb70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -59,6 +59,9 @@ public class ProtobufLogWriter extends WriterBase { } protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException { + if (!builder.hasWriterClsName()) { + builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName()); + } return builder.build(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java index 0c9de4b..7b025f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java @@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.regionserver.wal; import java.io.IOException; import java.security.Key; import java.security.KeyException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,15 +40,25 @@ public class SecureProtobufLogReader extends ProtobufLogReader { private static final Log LOG = LogFactory.getLog(SecureProtobufLogReader.class); private Decryptor decryptor = null; + private static List writerClsNames = new ArrayList(); + static { + writerClsNames.add(ProtobufLogWriter.class.getSimpleName()); + writerClsNames.add(SecureProtobufLogWriter.class.getSimpleName()); + } + + @Override + protected List getWriterClsNames() { + return writerClsNames; + } @Override - protected boolean readHeader(WALHeader.Builder builder, FSDataInputStream stream) + protected WALHdrResult readHeader(WALHeader.Builder builder, FSDataInputStream stream) throws IOException { - boolean result = super.readHeader(builder, stream); + WALHdrResult result = super.readHeader(builder, stream); // We need to unconditionally handle the case where the WAL has a key in // the header, meaning it is encrypted, even if ENABLE_WAL_ENCRYPTION is // no longer set in the site configuration. - if (result && builder.hasEncryptionKey()) { + if (result == WALHdrResult.SUCCESS && builder.hasEncryptionKey()) { // Serialized header data has been merged into the builder from the // stream. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java index ee100fd..fa95388 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogWriter.java @@ -44,6 +44,7 @@ public class SecureProtobufLogWriter extends ProtobufLogWriter { @Override protected WALHeader buildWALHeader(WALHeader.Builder builder) throws IOException { + builder.setWriterClsName(SecureProtobufLogWriter.class.getSimpleName()); if (conf.getBoolean(HConstants.ENABLE_WAL_ENCRYPTION, false)) { // Get an instance of our cipher Cipher cipher = Encryption.getCipher(conf, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java new file mode 100644 index 0000000..a23d4c7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogReaderOnSecureHLog.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; +import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.log4j.Level; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/* + * Test that verifies WAL written by SecureProtobufLogWriter is not readable by ProtobufLogReader + */ +@Category(MediumTests.class) +public class TestHLogReaderOnSecureHLog { + static final Log LOG = LogFactory.getLog(TestHLogReaderOnSecureHLog.class); + static { + ((Log4JLogger)LogFactory.getLog("org.apache.hadoop.hbase.regionserver.wal")) + .getLogger().setLevel(Level.ALL); + }; + static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + final byte[] value = Bytes.toBytes("Test value"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); + conf.set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); + conf.setBoolean("hbase.hlog.split.skip.errors", true); + conf.setBoolean(HConstants.ENABLE_WAL_ENCRYPTION, true); + } + + private Path writeWAL(String tblName) throws IOException { + Configuration conf = TEST_UTIL.getConfiguration(); + TableName tableName = TableName.valueOf(tblName); + HTableDescriptor htd = new HTableDescriptor(tableName); + htd.addFamily(new HColumnDescriptor(tableName.getName())); + HRegionInfo regioninfo = new HRegionInfo(tableName, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, false); + final int total = 10; + final byte[] row = Bytes.toBytes("row"); + final byte[] family = Bytes.toBytes("family"); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path logDir = TEST_UTIL.getDataTestDir(tblName); + final AtomicLong sequenceId = new AtomicLong(1); + + // Write the WAL + FSHLog wal = new FSHLog(fs, TEST_UTIL.getDataTestDir(), logDir.toString(), conf); + for (int i = 0; i < total; i++) { + WALEdit kvs = new WALEdit(); + kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value)); + wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, sequenceId); + } + final Path walPath = ((FSHLog) wal).computeFilename(); + wal.close(); + + return walPath; + } + + @Test() + public void testHLogReaderOnSecureHLog() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + HLogFactory.resetLogReaderClass(); + HLogFactory.resetLogWriterClass(); + conf.setClass("hbase.regionserver.hlog.reader.impl", ProtobufLogReader.class, + HLog.Reader.class); + conf.setClass("hbase.regionserver.hlog.writer.impl", SecureProtobufLogWriter.class, + HLog.Writer.class); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path walPath = writeWAL("testHLogReaderOnSecureHLog"); + + // Insure edits are not plaintext + long length = fs.getFileStatus(walPath).getLen(); + FSDataInputStream in = fs.open(walPath); + byte[] fileData = new byte[(int)length]; + IOUtils.readFully(in, fileData); + in.close(); + assertFalse("Cells appear to be plaintext", Bytes.contains(fileData, value)); + + // Confirm the WAL cannot be read back by ProtobufLogReader + try { + HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf); + assertFalse(true); + } catch (IOException ioe) { + // expected IOE + } + + FileStatus[] listStatus = fs.listStatus(walPath.getParent()); + RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + Path rootdir = FSUtils.getRootDir(conf); + try { + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode); + s.splitLogFile(listStatus[0], null); + Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), + "corrupt"); + assertTrue(fs.exists(file)); + // assertFalse("log splitting should have failed", true); + } catch (IOException ioe) { + assertTrue("WAL should have been sidelined", false); + } + } + + @Test() + public void testSecureHLogReaderOnHLog() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + HLogFactory.resetLogReaderClass(); + HLogFactory.resetLogWriterClass(); + conf.setClass("hbase.regionserver.hlog.reader.impl", SecureProtobufLogReader.class, + HLog.Reader.class); + conf.setClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, + HLog.Writer.class); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path walPath = writeWAL("testSecureHLogReaderOnHLog"); + + // Insure edits are not plaintext + long length = fs.getFileStatus(walPath).getLen(); + FSDataInputStream in = fs.open(walPath); + byte[] fileData = new byte[(int)length]; + IOUtils.readFully(in, fileData); + in.close(); + assertTrue("Cells should be plaintext", Bytes.contains(fileData, value)); + + // Confirm the WAL cannot be read back by ProtobufLogReader + try { + HLog.Reader reader = HLogFactory.createReader(TEST_UTIL.getTestFileSystem(), walPath, conf); + } catch (IOException ioe) { + assertFalse(true); + } + + FileStatus[] listStatus = fs.listStatus(walPath.getParent()); + RecoveryMode mode = (conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, false) ? + RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING); + Path rootdir = FSUtils.getRootDir(conf); + try { + HLogSplitter s = new HLogSplitter(conf, rootdir, fs, null, null, mode); + s.splitLogFile(listStatus[0], null); + Path file = new Path(ZKSplitLog.getSplitLogDir(rootdir, listStatus[0].getPath().getName()), + "corrupt"); + assertTrue(!fs.exists(file)); + } catch (IOException ioe) { + assertTrue("WAL should have been sidelined", false); + } + } +}