From 9575e7ca60dd6b6375836a171486ebacd2939a43 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 16 Jul 2018 12:21:33 -0500 Subject: [PATCH] HBASE-20894 Use proto for BucketCache persistence --- .../src/main/protobuf/BucketCacheEntry.proto | 82 ++++++++++ .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 172 +++++++++++++++------ 2 files changed, 207 insertions(+), 47 deletions(-) create mode 100644 hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto new file mode 100644 index 0000000000..f0f11e3532 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "BucketProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message BucketCacheEntry { + required int64 cache_capacity = 1; + required string io_class = 2; + required string map_class = 3; + required UniqueIndexMap deserializer_map = 4; + required BackingMap backing_map = 5; + + message UniqueIndexMap { + map forward_map = 5; + } + message BackingMap { + repeated BackingMapEntry entry = 1; + } +} + +message BackingMapEntry { + required BlockCacheKey key = 1; + required BucketEntry value = 2; + + message BlockCacheKey { + required string hfilename = 1; + required int64 offset = 2; + required BlockType block_type = 3; + required bool isPrimaryReplicaBlock = 4; + + enum BlockType { + data = 0; + encoded_data = 1; + leaf_index = 2; + bloom_chunk = 3; + meta = 4; + intermediate_index = 5; + root_index = 6; + file_info = 7; + general_bloom_meta = 8; + delete_family_bloom_meta = 9; + trailer = 10; + index_v1 = 11; + } + } + + message BucketEntry { + required int64 offset = 1; + required int32 length = 2; + required int64 access_counter = 3; + required int32 deserialiserIndex = 4; + required BlockPriority priority = 5; + + enum BlockPriority { + single = 0; + multi = 1; + memory = 2; + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 40c0a00cb9..73a05016b9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -20,13 +20,16 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.SequenceInputStream; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -68,6 +71,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.protobuf.ProtobufMagic; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -1085,70 +1091,142 @@ public class BucketCache implements BlockCache, HeapSize { private void persistToFile() throws IOException { assert !cacheEnabled; - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { - if (!ioEngine.isPersistent()) { - throw new IOException("Attempt to persist non-persistent cache mappings!"); - } - fos = new FileOutputStream(persistencePath, false); - oos = new ObjectOutputStream(fos); - oos.writeLong(cacheCapacity); - oos.writeUTF(ioEngine.getClass().getName()); - oos.writeUTF(backingMap.getClass().getName()); - oos.writeObject(deserialiserMap); - oos.writeObject(backingMap); - } finally { - if (oos != null) oos.close(); - if (fos != null) fos.close(); + if (!ioEngine.isPersistent()) { + throw new IOException("Attempt to persist non-persistent cache mappings!"); } + try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { + fos.write(ProtobufMagic.PB_MAGIC); + BucketProtos.BucketCacheEntry.newBuilder() + .setCacheCapacity(cacheCapacity) + .setIoClass(ioEngine.getClass().getName()) + .setMapClass(backingMap.getClass().getName()) + .setDeserializerMap(deserialiserMapAsPB()) + .setBackingMap(backingMapAsPb()) + .build() + .writeTo(fos); // when to use writeTo v writeDelimitedTo? + } + } + + private BucketProtos.BucketCacheEntry.UniqueIndexMap.Builder deserialiserMapAsPB() { + BucketProtos.BucketCacheEntry.UniqueIndexMap.Builder x = + BucketProtos.BucketCacheEntry.UniqueIndexMap.newBuilder(); + for (Map.Entry entry : deserialiserMap.mForwardMap.entrySet()) { + x.putForwardMap(entry.getKey(), entry.getValue()); + } + return x; + } + + private BucketProtos.BucketCacheEntry.BackingMap.Builder backingMapAsPb() { + BucketProtos.BucketCacheEntry.BackingMap.Builder x = + BucketProtos.BucketCacheEntry.BackingMap.newBuilder(); + for (Map.Entry entry : backingMap.entrySet()) { + x.addEntry(BucketProtos.BackingMapEntry.newBuilder() + .setKey(BucketProtos.BackingMapEntry.BlockCacheKey.newBuilder() + .setHfilename(entry.getKey().getHfileName()) + .setOffset(entry.getKey().getOffset()) + .setIsPrimaryReplicaBlock(entry.getKey().isPrimary()) + // .setBlockType() TODO: figure out enums + .build()) + .setValue(BucketProtos.BackingMapEntry.BucketEntry.newBuilder() + .setOffset(entry.getValue().offset()) + .setLength(entry.getValue().getLength()) + .setDeserialiserIndex(entry.getValue().deserialiserIndex) + // .setPriority() TODO: figure out enums + .build()) + .build()); + } + return x; } - @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, - ClassNotFoundException { + /** + * @see #persistToFile() + */ + private void retrieveFromFile(int[] bucketSizes) throws IOException, ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; } assert !cacheEnabled; - FileInputStream fis = null; - ObjectInputStream ois = null; - try { - if (!ioEngine.isPersistent()) - throw new IOException( - "Attempt to restore non-persistent cache mappings!"); - fis = new FileInputStream(persistencePath); - ois = new ObjectInputStream(fis); + + try (FileInputStream in = new FileInputStream(persistencePath)) { + int pblen = ProtobufMagic.lengthOfPBMagic(); + byte[] pbuf = new byte[pblen]; + int read = in.read(pbuf); + if (read != pblen) throw new IOException("read=" + read + ", wanted=" + pblen); + if (ProtobufMagic.isPBMagicPrefix(pbuf)) { + parsePB(BucketProtos.BucketCacheEntry.parseFrom(in)); + } else { + ByteArrayInputStream bais = new ByteArrayInputStream(pbuf); + SequenceInputStream sis = new SequenceInputStream(bais, in); // Concatenate input streams + parseSerializable(sis); + } + bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); + } finally { + if (!persistenceFile.delete()) { + throw new IOException("Failed deleting persistence file " + + persistenceFile.getAbsolutePath()); + } + } + } + + private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) + throws IOException { + if (capacitySize != cacheCapacity) + throw new IOException("Mismatched cache capacity:" + + StringUtils.byteDesc(capacitySize) + ", expected: " + + StringUtils.byteDesc(cacheCapacity)); + if (!ioEngine.getClass().getName().equals(ioclass)) + throw new IOException("Class name for IO engine mismatch: " + ioclass + + ", expected:" + ioEngine.getClass().getName()); + if (!backingMap.getClass().getName().equals(mapclass)) + throw new IOException("Class name for cache map mismatch: " + mapclass + + ", expected:" + backingMap.getClass().getName()); + } + + private void parsePB(BucketProtos.BucketCacheEntry proto) throws IOException { + verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); + + deserialiserMap = new UniqueIndexMap<>(); + deserialiserMap.mForwardMap = new ConcurrentHashMap<>( + proto.getDeserializerMap().getForwardMapMap()); + deserialiserMap.mReverseMap = new ConcurrentHashMap<>(); + for (Map.Entry fEntry : deserialiserMap.mForwardMap.entrySet()) { + deserialiserMap.mReverseMap.put(fEntry.getValue(), fEntry.getKey()); + } + deserialiserMap.mIndex = new AtomicInteger(deserialiserMap.mForwardMap.size()); + + backingMap = new ConcurrentHashMap<>(); + for (BucketProtos.BackingMapEntry entry : proto.getBackingMap().getEntryList()) { + BucketProtos.BackingMapEntry.BlockCacheKey protoKey = entry.getKey(); + BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), + protoKey.getIsPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + BucketProtos.BackingMapEntry.BucketEntry protoValue = entry.getValue(); + BucketEntry value = new BucketEntry(protoValue.getOffset(), protoValue.getLength(), + 0, false); // TODO finish these arguments + backingMap.put(key, value); + } + } + + private BlockType fromPb(BucketProtos.BackingMapEntry.BlockCacheKey.BlockType blockType) { + return null; // TODO + } + + @SuppressWarnings("unchecked") + private void parseSerializable(InputStream in) throws IOException, ClassNotFoundException { + if (!ioEngine.isPersistent()) { + throw new IOException("Attempt to restore non-persistent cache mappings!"); + } + try (ObjectInputStream ois = new ObjectInputStream(in)) { long capacitySize = ois.readLong(); - if (capacitySize != cacheCapacity) - throw new IOException("Mismatched cache capacity:" - + StringUtils.byteDesc(capacitySize) + ", expected: " - + StringUtils.byteDesc(cacheCapacity)); String ioclass = ois.readUTF(); String mapclass = ois.readUTF(); - if (!ioEngine.getClass().getName().equals(ioclass)) - throw new IOException("Class name for IO engine mismatch: " + ioclass - + ", expected:" + ioEngine.getClass().getName()); - if (!backingMap.getClass().getName().equals(mapclass)) - throw new IOException("Class name for cache map mismatch: " + mapclass - + ", expected:" + backingMap.getClass().getName()); + verifyCapacityAndClasses(capacitySize, ioclass, mapclass); UniqueIndexMap deserMap = (UniqueIndexMap) ois .readObject(); ConcurrentHashMap backingMapFromFile = (ConcurrentHashMap) ois.readObject(); - BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, - backingMapFromFile, realCacheSize); - bucketAllocator = allocator; deserialiserMap = deserMap; backingMap = backingMapFromFile; - } finally { - if (ois != null) ois.close(); - if (fis != null) fis.close(); - if (!persistenceFile.delete()) { - throw new IOException("Failed deleting persistence file " - + persistenceFile.getAbsolutePath()); - } } } -- 2.16.1