From 060392f145952e15d1ddbe3a7af279f851b90e25 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 16 Jul 2018 12:21:33 -0500 Subject: [PATCH] HBASE-20894 Use proto for BucketCache persistence --- .../src/main/protobuf/BucketCacheEntry.proto | 84 +++++++++ .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 143 +++++++++------ .../hbase/io/hfile/bucket/BucketProtoUtils.java | 191 +++++++++++++++++++++ 3 files changed, 367 insertions(+), 51 deletions(-) create mode 100644 hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto new file mode 100644 index 0000000000..102ee997fd --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "BucketProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message BucketCacheEntry { + required int64 cache_capacity = 1; + required string io_class = 2; + required string map_class = 3; + required UniqueIndexMap deserializer_map = 4; + required BackingMap backing_map = 5; +} + +message UniqueIndexMap { + map forward_map = 5; +} + +message BackingMap { + repeated BackingMapEntry entry = 1; +} + +message BackingMapEntry { + required BlockCacheKey key = 1; + required BucketEntry value = 2; +} + +message BlockCacheKey { + required string hfilename = 1; + required int64 offset = 2; + required BlockType block_type = 3; + required bool isPrimaryReplicaBlock = 4; + +} + +enum BlockType { + data = 0; + encoded_data = 1; + leaf_index = 2; + bloom_chunk = 3; + meta = 4; + intermediate_index = 5; + root_index = 6; + file_info = 7; + general_bloom_meta = 8; + delete_family_bloom_meta = 9; + trailer = 10; + index_v1 = 11; +} + +message BucketEntry { + required int64 offset = 1; + required int32 length = 2; + required int64 access_counter = 3; + required int32 deserialiserIndex = 4; + required BlockPriority priority = 5; +} + +enum BlockPriority { + single = 0; + multi = 1; + memory = 2; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 40c0a00cb9..abc407efbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -20,13 +20,15 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.SequenceInputStream; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -68,6 +70,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -164,7 +169,7 @@ public class BucketCache implements BlockCache, HeapSize { private volatile boolean freeInProgress = false; private final Lock freeSpaceLock = new ReentrantLock(); - private UniqueIndexMap deserialiserMap = new UniqueIndexMap<>(); + UniqueIndexMap deserialiserMap = new UniqueIndexMap<>(); private final LongAdder realCacheSize = new LongAdder(); private final LongAdder heapSize = new LongAdder(); @@ -1083,72 +1088,108 @@ public class BucketCache implements BlockCache, HeapSize { return receptacle; } + /** + * @see #retrieveFromFile(int[]) + */ private void persistToFile() throws IOException { assert !cacheEnabled; - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { - if (!ioEngine.isPersistent()) { - throw new IOException("Attempt to persist non-persistent cache mappings!"); - } - fos = new FileOutputStream(persistencePath, false); - oos = new ObjectOutputStream(fos); - oos.writeLong(cacheCapacity); - oos.writeUTF(ioEngine.getClass().getName()); - oos.writeUTF(backingMap.getClass().getName()); - oos.writeObject(deserialiserMap); - oos.writeObject(backingMap); - } finally { - if (oos != null) oos.close(); - if (fos != null) fos.close(); + if (!ioEngine.isPersistent()) { + throw new IOException("Attempt to persist non-persistent cache mappings!"); + } + try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { + fos.write(ProtobufMagic.PB_MAGIC); + BucketProtoUtils.toPB(this).writeTo(fos); // when to use writeTo v writeDelimitedTo? } } - @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, - ClassNotFoundException { + /** + * @see #persistToFile() + */ + private void retrieveFromFile(int[] bucketSizes) throws IOException, ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; } assert !cacheEnabled; - FileInputStream fis = null; - ObjectInputStream ois = null; - try { - if (!ioEngine.isPersistent()) - throw new IOException( - "Attempt to restore non-persistent cache mappings!"); - fis = new FileInputStream(persistencePath); - ois = new ObjectInputStream(fis); + + try (FileInputStream in = deleteFileOnClose(persistenceFile)) { + int pblen = ProtobufMagic.lengthOfPBMagic(); + byte[] pbuf = new byte[pblen]; + int read = in.read(pbuf); + if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen); + if (ProtobufMagic.isPBMagicPrefix(pbuf)) { + parsePB(BucketProtos.BucketCacheEntry.parseFrom(in)); + } else { // TODO toggle ability to read old format behind a configuration flag + ByteArrayInputStream bais = new ByteArrayInputStream(pbuf); + SequenceInputStream sis = new SequenceInputStream(bais, in); + parseSerializable(sis); + } + bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); + } + } + + /** + * Create an input stream that deletes the file after reading it. Use in try-with-resources to + * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: + *
+   *   File f = ...
+   *   try (FileInputStream fis = new FileInputStream(f)) {
+   *     // use the input stream
+   *   } finally {
+   *     if (!f.delete()) throw new IOException("failed to delete");
+   *   }
+   * 
+ * @param file the file to read and delete + * @return a FileInputStream for the given file + * @throws IOException if there is a problem creating the stream + */ + private FileInputStream deleteFileOnClose(final File file) throws IOException { + return new FileInputStream(file) { + @Override + public void close() throws IOException { + super.close(); + if (!file.delete()) { + throw new IOException("Failed deleting persistence file " + file.getAbsolutePath()); + } + } + }; + } + + private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) + throws IOException { + if (capacitySize != cacheCapacity) + throw new IOException("Mismatched cache capacity:" + + StringUtils.byteDesc(capacitySize) + ", expected: " + + StringUtils.byteDesc(cacheCapacity)); + if (!ioEngine.getClass().getName().equals(ioclass)) + throw new IOException("Class name for IO engine mismatch: " + ioclass + + ", expected:" + ioEngine.getClass().getName()); + if (!backingMap.getClass().getName().equals(mapclass)) + throw new IOException("Class name for cache map mismatch: " + mapclass + + ", expected:" + backingMap.getClass().getName()); + } + + private void parsePB(BucketProtos.BucketCacheEntry proto) throws IOException { + verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); + deserialiserMap = BucketProtoUtils.fromPB(proto.getDeserializerMap()); + backingMap = BucketProtoUtils.fromPB(deserialiserMap, proto.getBackingMap()); + } + + @SuppressWarnings("unchecked") + private void parseSerializable(InputStream in) throws IOException, ClassNotFoundException { + if (!ioEngine.isPersistent()) { + throw new IOException("Attempt to restore non-persistent cache mappings!"); + } + try (ObjectInputStream ois = new ObjectInputStream(in)) { long capacitySize = ois.readLong(); - if (capacitySize != cacheCapacity) - throw new IOException("Mismatched cache capacity:" - + StringUtils.byteDesc(capacitySize) + ", expected: " - + StringUtils.byteDesc(cacheCapacity)); String ioclass = ois.readUTF(); String mapclass = ois.readUTF(); - if (!ioEngine.getClass().getName().equals(ioclass)) - throw new IOException("Class name for IO engine mismatch: " + ioclass - + ", expected:" + ioEngine.getClass().getName()); - if (!backingMap.getClass().getName().equals(mapclass)) - throw new IOException("Class name for cache map mismatch: " + mapclass - + ", expected:" + backingMap.getClass().getName()); - UniqueIndexMap deserMap = (UniqueIndexMap) ois - .readObject(); + verifyCapacityAndClasses(capacitySize, ioclass, mapclass); + UniqueIndexMap deserMap = (UniqueIndexMap) ois.readObject(); ConcurrentHashMap backingMapFromFile = (ConcurrentHashMap) ois.readObject(); - BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, - backingMapFromFile, realCacheSize); - bucketAllocator = allocator; deserialiserMap = deserMap; backingMap = backingMapFromFile; - } finally { - if (ois != null) ois.close(); - if (fis != null) fis.close(); - if (!persistenceFile.delete()) { - throw new IOException("Failed deleting persistence file " - + persistenceFile.getAbsolutePath()); - } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java new file mode 100644 index 0000000000..8aa159c390 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -0,0 +1,191 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockPriority; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketProtos; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +final class BucketProtoUtils { + private BucketProtoUtils() { + + } + + static BucketProtos.BucketCacheEntry toPB(BucketCache cache) { + return BucketProtos.BucketCacheEntry.newBuilder() + .setCacheCapacity(cache.getMaxSize()) + .setIoClass(cache.ioEngine.getClass().getName()) + .setMapClass(cache.backingMap.getClass().getName()) + .setDeserializerMap(BucketProtoUtils.toPB(cache.deserialiserMap)) + .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) + .build(); + } + + private static BucketProtos.UniqueIndexMap toPB(UniqueIndexMap deserialiserMap) { + BucketProtos.UniqueIndexMap.Builder builder = BucketProtos.UniqueIndexMap.newBuilder(); + for (Map.Entry entry : deserialiserMap.mForwardMap.entrySet()) { + builder.putForwardMap(entry.getKey(), entry.getValue()); + } + return builder.build(); + } + + private static BucketProtos.BackingMap toPB(Map backingMap) { + BucketProtos.BackingMap.Builder builder = BucketProtos.BackingMap.newBuilder(); + for (Map.Entry entry : backingMap.entrySet()) { + builder.addEntry(BucketProtos.BackingMapEntry.newBuilder() + .setKey(toPB(entry.getKey())) + .setValue(toPB(entry.getValue())) + .build()); + } + return builder.build(); + } + + private static BucketProtos.BlockCacheKey toPB(BlockCacheKey key) { + return BucketProtos.BlockCacheKey.newBuilder() + .setHfilename(key.getHfileName()) + .setOffset(key.getOffset()) + .setIsPrimaryReplicaBlock(key.isPrimary()) + .setBlockType(toPB(key.getBlockType())) + .build(); + } + + private static BucketProtos.BlockType toPB(BlockType blockType) { + switch(blockType) { + case DATA: + return BucketProtos.BlockType.data; + case META: + return BucketProtos.BlockType.meta; + case TRAILER: + return BucketProtos.BlockType.trailer; + case INDEX_V1: + return BucketProtos.BlockType.index_v1; + case FILE_INFO: + return BucketProtos.BlockType.file_info; + case LEAF_INDEX: + return BucketProtos.BlockType.leaf_index; + case ROOT_INDEX: + return BucketProtos.BlockType.root_index; + case BLOOM_CHUNK: + return BucketProtos.BlockType.bloom_chunk; + case ENCODED_DATA: + return BucketProtos.BlockType.encoded_data; + case GENERAL_BLOOM_META: + return BucketProtos.BlockType.general_bloom_meta; + case INTERMEDIATE_INDEX: + return BucketProtos.BlockType.intermediate_index; + case DELETE_FAMILY_BLOOM_META: + return BucketProtos.BlockType.delete_family_bloom_meta; + default: + throw new Error("Unrecognized BlockType."); + } + } + + private static BucketProtos.BucketEntry toPB(BucketCache.BucketEntry entry) { + return BucketProtos.BucketEntry.newBuilder() + .setOffset(entry.offset()) + .setLength(entry.getLength()) + .setDeserialiserIndex(entry.deserialiserIndex) + .setPriority(toPB(entry.getPriority())) + .build(); + } + + private static BucketProtos.BlockPriority toPB(BlockPriority p) { + switch (p) { + case MULTI: + return BucketProtos.BlockPriority.multi; + case MEMORY: + return BucketProtos.BlockPriority.memory; + case SINGLE: + return BucketProtos.BlockPriority.single; + default: + throw new Error("Unrecognized BlockPriority."); + } + } + + static UniqueIndexMap fromPB(BucketProtos.UniqueIndexMap deserializerMap) { + UniqueIndexMap result = new UniqueIndexMap<>(); + result.mForwardMap = new ConcurrentHashMap<>(deserializerMap.getForwardMapMap()); + result.mReverseMap = new ConcurrentHashMap<>(); + for (Map.Entry fEntry : result.mForwardMap.entrySet()) { + result.mReverseMap.put(fEntry.getValue(), fEntry.getKey()); + } + result.mIndex = new AtomicInteger(result.mForwardMap.size()); + return result; + } + + static ConcurrentHashMap fromPB( + UniqueIndexMap deserializerMap, BucketProtos.BackingMap backingMap) { + ConcurrentHashMap result = new ConcurrentHashMap<>(); + for (BucketProtos.BackingMapEntry entry : backingMap.getEntryList()) { + BucketProtos.BlockCacheKey protoKey = entry.getKey(); + BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), + protoKey.getIsPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + BucketProtos.BucketEntry protoValue = entry.getValue(); + BucketCache.BucketEntry value = new BucketCache.BucketEntry( + protoValue.getOffset(), + protoValue.getLength(), + 0, + protoValue.getPriority() == BucketProtos.BlockPriority.memory); + // TODO where do we find the deserializer? + // value.setDeserialiserReference(null, deserializerMap); + result.put(key, value); + } + return result; + } + + private static BlockType fromPb(BucketProtos.BlockType blockType) { + switch (blockType) { + case data: + return BlockType.DATA; + case meta: + return BlockType.META; + case trailer: + return BlockType.TRAILER; + case index_v1: + return BlockType.INDEX_V1; + case file_info: + return BlockType.FILE_INFO; + case leaf_index: + return BlockType.LEAF_INDEX; + case root_index: + return BlockType.ROOT_INDEX; + case bloom_chunk: + return BlockType.BLOOM_CHUNK; + case encoded_data: + return BlockType.ENCODED_DATA; + case general_bloom_meta: + return BlockType.GENERAL_BLOOM_META; + case intermediate_index: + return BlockType.INTERMEDIATE_INDEX; + case delete_family_bloom_meta: + return BlockType.DELETE_FAMILY_BLOOM_META; + default: + throw new Error("Unrecognized BlockType."); + } + } +} -- 2.16.1