From 13fb4e70822d10628b00e72ab4be55267a20ad30 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Mon, 16 Jul 2018 12:21:33 -0500 Subject: [PATCH] HBASE-20894 Use proto for BucketCache persistence --- .../src/main/protobuf/BucketCacheEntry.proto | 79 +++++++++ .../io/hfile/CacheableDeserializerIdManager.java | 34 +++- .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 13 +- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 180 ++++++++++--------- .../hbase/io/hfile/bucket/BucketProtoUtils.java | 193 +++++++++++++++++++++ .../hbase/io/hfile/bucket/UniqueIndexMap.java | 56 ------ .../hadoop/hbase/io/hfile/CacheTestUtils.java | 1 + .../hbase/io/hfile/bucket/TestBucketCache.java | 28 +-- .../io/hfile/bucket/TestBucketWriterThread.java | 6 +- 9 files changed, 427 insertions(+), 163 deletions(-) create mode 100644 hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java diff --git a/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto new file mode 100644 index 0000000000..6ecb79bf53 --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/BucketCacheEntry.proto @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "BucketCacheProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +message BucketCacheEntry { + required int64 cache_capacity = 1; + required string io_class = 2; + required string map_class = 3; + map deserializers = 4; + required BackingMap backing_map = 5; +} + +message BackingMap { + repeated BackingMapEntry entry = 1; +} + +message BackingMapEntry { + required BlockCacheKey key = 1; + required BucketEntry value = 2; +} + +message BlockCacheKey { + required string hfilename = 1; + required int64 offset = 2; + required BlockType block_type = 3; + required bool isPrimaryReplicaBlock = 4; +} + +enum BlockType { + data = 0; + encoded_data = 1; + leaf_index = 2; + bloom_chunk = 3; + meta = 4; + intermediate_index = 5; + root_index = 6; + file_info = 7; + general_bloom_meta = 8; + delete_family_bloom_meta = 9; + trailer = 10; + index_v1 = 11; +} + +message BucketEntry { + required int64 offset = 1; + required int32 length = 2; + required int64 access_counter = 3; + required int32 deserialiserIndex = 4; + required BlockPriority priority = 5; +} + +enum BlockPriority { + single = 0; + multi = 1; + memory = 2; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java index b1ed77dfac..bcc29c2fda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java @@ -25,8 +25,11 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.yetus.audience.InterfaceAudience; /** - * This class is used to manage the identifiers for - * {@link CacheableDeserializer} + * This class is used to manage the identifiers for {@link CacheableDeserializer}. + * All deserializers are registered with this Manager via the + * {@link #registerDeserializer(CacheableDeserializer)}}. On registration, we return an + * int *identifier* for this deserializer. The int identifier is passed to + * {@link #getDeserializer(int)}} to obtain the registered deserializer instance. */ @InterfaceAudience.Private public class CacheableDeserializerIdManager { @@ -34,10 +37,11 @@ public class CacheableDeserializerIdManager { private static final AtomicInteger identifier = new AtomicInteger(0); /** - * Register the given cacheable deserializer and generate an unique identifier - * id for it - * @param cd + * Register the given {@link Cacheable} -- usually an hfileblock instance, these implement + * the Cacheable Interface -- deserializer and generate an unique identifier id for it and return + * this as our result. * @return the identifier of given cacheable deserializer + * @see #getDeserializer(int) */ public static int registerDeserializer(CacheableDeserializer cd) { int idx = identifier.incrementAndGet(); @@ -48,11 +52,25 @@ public class CacheableDeserializerIdManager { } /** - * Get the cacheable deserializer as the given identifier Id - * @param id - * @return CacheableDeserializer + * Get the cacheable deserializer registered at the given identifier Id. + * @see #registerDeserializer(CacheableDeserializer) */ public static CacheableDeserializer getDeserializer(int id) { return registeredDeserializers.get(id); } + + /** + * Snapshot a map of the current identifiers to class names for reconstruction on reading out + * of a file. + */ + public static Map save() { + Map snapshot = new HashMap<>(); + synchronized (registeredDeserializers) { + for (Map.Entry> entry : + registeredDeserializers.entrySet()) { + snapshot.put(entry.getKey(), entry.getValue().getClass().getName()); + } + } + return snapshot; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 238cd70bc9..6b0c13badc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -26,6 +26,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -251,10 +252,14 @@ public class HFileBlock implements Cacheable { * + Metadata! + <= See note on BLOCK_METADATA_SPACE above. * ++++++++++++++ * - * @see #serialize(ByteBuffer) + * @see #serialize(ByteBuffer, boolean) */ - static final CacheableDeserializer BLOCK_DESERIALIZER = - new CacheableDeserializer() { + public static final CacheableDeserializer BLOCK_DESERIALIZER = new BlockDeserializer(); + + public static final class BlockDeserializer implements CacheableDeserializer { + private BlockDeserializer() { + } + @Override public HFileBlock deserialize(ByteBuff buf, boolean reuse, MemoryType memType) throws IOException { @@ -291,7 +296,7 @@ public class HFileBlock implements Cacheable { // Used only in tests return deserialize(b, false, MemoryType.EXCLUSIVE); } - }; + } private static final int DESERIALIZER_IDENTIFIER; static { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 40c0a00cb9..809c8de005 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -25,8 +25,6 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -68,6 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -78,6 +77,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -164,8 +164,6 @@ public class BucketCache implements BlockCache, HeapSize { private volatile boolean freeInProgress = false; private final Lock freeSpaceLock = new ReentrantLock(); - private UniqueIndexMap deserialiserMap = new UniqueIndexMap<>(); - private final LongAdder realCacheSize = new LongAdder(); private final LongAdder heapSize = new LongAdder(); /** Current number of cached elements */ @@ -301,7 +299,7 @@ public class BucketCache implements BlockCache, HeapSize { } catch (IOException ioex) { LOG.error("Can't restore from file because of", ioex); } catch (ClassNotFoundException cnfe) { - LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); + LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe); throw new RuntimeException(cnfe); } } @@ -511,7 +509,7 @@ public class BucketCache implements BlockCache, HeapSize { LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); } Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, - bucketEntry.deserializerReference(this.deserialiserMap)); + bucketEntry.deserializerReference()); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); @@ -988,7 +986,7 @@ public class BucketCache implements BlockCache, HeapSize { continue; } BucketEntry bucketEntry = - re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); + re.writeToCache(ioEngine, bucketAllocator, realCacheSize); // Successfully added. Up index and add bucketEntry. Clear io exceptions. bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { @@ -1083,75 +1081,98 @@ public class BucketCache implements BlockCache, HeapSize { return receptacle; } + /** + * @see #retrieveFromFile(int[]) + */ private void persistToFile() throws IOException { assert !cacheEnabled; - FileOutputStream fos = null; - ObjectOutputStream oos = null; - try { - if (!ioEngine.isPersistent()) { - throw new IOException("Attempt to persist non-persistent cache mappings!"); - } - fos = new FileOutputStream(persistencePath, false); - oos = new ObjectOutputStream(fos); - oos.writeLong(cacheCapacity); - oos.writeUTF(ioEngine.getClass().getName()); - oos.writeUTF(backingMap.getClass().getName()); - oos.writeObject(deserialiserMap); - oos.writeObject(backingMap); - } finally { - if (oos != null) oos.close(); - if (fos != null) fos.close(); + if (!ioEngine.isPersistent()) { + throw new IOException("Attempt to persist non-persistent cache mappings!"); + } + try (FileOutputStream fos = new FileOutputStream(persistencePath, false)) { + fos.write(ProtobufMagic.PB_MAGIC); + BucketProtoUtils.toPB(this).writeDelimitedTo(fos); } } - @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, - ClassNotFoundException { + /** + * @see #persistToFile() + */ + private void retrieveFromFile(int[] bucketSizes) throws IOException, ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; } assert !cacheEnabled; - FileInputStream fis = null; - ObjectInputStream ois = null; - try { - if (!ioEngine.isPersistent()) - throw new IOException( - "Attempt to restore non-persistent cache mappings!"); - fis = new FileInputStream(persistencePath); - ois = new ObjectInputStream(fis); - long capacitySize = ois.readLong(); - if (capacitySize != cacheCapacity) - throw new IOException("Mismatched cache capacity:" - + StringUtils.byteDesc(capacitySize) + ", expected: " - + StringUtils.byteDesc(cacheCapacity)); - String ioclass = ois.readUTF(); - String mapclass = ois.readUTF(); - if (!ioEngine.getClass().getName().equals(ioclass)) - throw new IOException("Class name for IO engine mismatch: " + ioclass - + ", expected:" + ioEngine.getClass().getName()); - if (!backingMap.getClass().getName().equals(mapclass)) - throw new IOException("Class name for cache map mismatch: " + mapclass - + ", expected:" + backingMap.getClass().getName()); - UniqueIndexMap deserMap = (UniqueIndexMap) ois - .readObject(); - ConcurrentHashMap backingMapFromFile = - (ConcurrentHashMap) ois.readObject(); - BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, - backingMapFromFile, realCacheSize); - bucketAllocator = allocator; - deserialiserMap = deserMap; - backingMap = backingMapFromFile; - } finally { - if (ois != null) ois.close(); - if (fis != null) fis.close(); - if (!persistenceFile.delete()) { - throw new IOException("Failed deleting persistence file " - + persistenceFile.getAbsolutePath()); + + try (FileInputStream in = deleteFileOnClose(persistenceFile)) { + int pblen = ProtobufMagic.lengthOfPBMagic(); + byte[] pbuf = new byte[pblen]; + int read = in.read(pbuf); + if (read != pblen) { + throw new IOException("Incorrect number of bytes read while checking for protobuf magic " + + "number. Requested=" + pblen + ", Received= " + read + ", File=" + persistencePath); } + if (! ProtobufMagic.isPBMagicPrefix(pbuf)) { + // In 3.0 we have enough flexibility to dump the old cache data. + // TODO: In 2.x line, this might need to be filled in to support reading the old format + throw new IOException("Persistence file does not start with protobuf magic number. " + + persistencePath); + } + parsePB(BucketCacheProtos.BucketCacheEntry.parseDelimitedFrom(in)); + bucketAllocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMap, realCacheSize); + } + } + + /** + * Create an input stream that deletes the file after reading it. Use in try-with-resources to + * avoid this pattern where an exception thrown from a finally block may mask earlier exceptions: + *
+   *   File f = ...
+   *   try (FileInputStream fis = new FileInputStream(f)) {
+   *     // use the input stream
+   *   } finally {
+   *     if (!f.delete()) throw new IOException("failed to delete");
+   *   }
+   * 
+ * @param file the file to read and delete + * @return a FileInputStream for the given file + * @throws IOException if there is a problem creating the stream + */ + private FileInputStream deleteFileOnClose(final File file) throws IOException { + return new FileInputStream(file) { + @Override + public void close() throws IOException { + super.close(); + if (!file.delete()) { + throw new IOException("Failed deleting persistence file " + file.getAbsolutePath()); + } + } + }; + } + + private void verifyCapacityAndClasses(long capacitySize, String ioclass, String mapclass) + throws IOException { + if (capacitySize != cacheCapacity) { + throw new IOException("Mismatched cache capacity:" + + StringUtils.byteDesc(capacitySize) + ", expected: " + + StringUtils.byteDesc(cacheCapacity)); + } + if (!ioEngine.getClass().getName().equals(ioclass)) { + throw new IOException("Class name for IO engine mismatch: " + ioclass + + ", expected:" + ioEngine.getClass().getName()); + } + if (!backingMap.getClass().getName().equals(mapclass)) { + throw new IOException("Class name for cache map mismatch: " + mapclass + + ", expected:" + backingMap.getClass().getName()); } } + private void parsePB(BucketCacheProtos.BucketCacheEntry proto) throws IOException { + verifyCapacityAndClasses(proto.getCacheCapacity(), proto.getIoClass(), proto.getMapClass()); + backingMap = BucketProtoUtils.fromPB(proto.getDeserializersMap(), proto.getBackingMap()); + } + /** * Check whether we tolerate IO error this time. If the duration of IOEngine * throwing errors exceeds ioErrorsDurationTimeTolerated, we will disable the @@ -1287,18 +1308,19 @@ public class BucketCache implements BlockCache, HeapSize { private static final long serialVersionUID = -6741504807982257534L; // access counter comparator, descending order - static final Comparator COMPARATOR = new Comparator() { - - @Override - public int compare(BucketEntry o1, BucketEntry o2) { - return Long.compare(o2.accessCounter, o1.accessCounter); - } - }; + static final Comparator COMPARATOR = Comparator + .comparingLong(BucketEntry::getAccessCounter).reversed(); private int offsetBase; private int length; private byte offset1; + + /** + * The index of the deserializer that can deserialize this BucketEntry content. + * See {@link CacheableDeserializerIdManager} for hosting of index to serializers. + */ byte deserialiserIndex; + private volatile long accessCounter; private BlockPriority priority; @@ -1335,17 +1357,16 @@ public class BucketCache implements BlockCache, HeapSize { return length; } - protected CacheableDeserializer deserializerReference( - UniqueIndexMap deserialiserMap) { - return CacheableDeserializerIdManager.getDeserializer(deserialiserMap - .unmap(deserialiserIndex)); + protected CacheableDeserializer deserializerReference() { + return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex); } - protected void setDeserialiserReference( - CacheableDeserializer deserializer, - UniqueIndexMap deserialiserMap) { - this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer - .getDeserialiserIdentifier())); + protected void setDeserialiserReference(CacheableDeserializer deserializer) { + this.deserialiserIndex = (byte) deserializer.getDeserialiserIdentifier(); + } + + public long getAccessCounter() { + return accessCounter; } /** @@ -1504,7 +1525,6 @@ public class BucketCache implements BlockCache, HeapSize { public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, - final UniqueIndexMap deserialiserMap, final LongAdder realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); @@ -1516,7 +1536,7 @@ public class BucketCache implements BlockCache, HeapSize { ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory) : new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory) : new BucketEntry(offset, len, accessCounter, inMemory); - bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); + bucketEntry.setDeserialiserReference(data.getDeserializer()); try { if (data instanceof HFileBlock) { // If an instance of HFileBlock, save on some allocations. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java new file mode 100644 index 0000000000..a945a7005e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketProtoUtils.java @@ -0,0 +1,193 @@ +/* + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.BlockPriority; +import org.apache.hadoop.hbase.io.hfile.BlockType; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; +import org.apache.hadoop.hbase.io.hfile.HFileBlock; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.BucketCacheProtos; + +@InterfaceAudience.Private +final class BucketProtoUtils { + private BucketProtoUtils() { + + } + + static BucketCacheProtos.BucketCacheEntry toPB(BucketCache cache) { + return BucketCacheProtos.BucketCacheEntry.newBuilder() + .setCacheCapacity(cache.getMaxSize()) + .setIoClass(cache.ioEngine.getClass().getName()) + .setMapClass(cache.backingMap.getClass().getName()) + .putAllDeserializers(CacheableDeserializerIdManager.save()) + .setBackingMap(BucketProtoUtils.toPB(cache.backingMap)) + .build(); + } + + private static BucketCacheProtos.BackingMap toPB( + Map backingMap) { + BucketCacheProtos.BackingMap.Builder builder = BucketCacheProtos.BackingMap.newBuilder(); + for (Map.Entry entry : backingMap.entrySet()) { + builder.addEntry(BucketCacheProtos.BackingMapEntry.newBuilder() + .setKey(toPB(entry.getKey())) + .setValue(toPB(entry.getValue())) + .build()); + } + return builder.build(); + } + + private static BucketCacheProtos.BlockCacheKey toPB(BlockCacheKey key) { + return BucketCacheProtos.BlockCacheKey.newBuilder() + .setHfilename(key.getHfileName()) + .setOffset(key.getOffset()) + .setIsPrimaryReplicaBlock(key.isPrimary()) + .setBlockType(toPB(key.getBlockType())) + .build(); + } + + private static BucketCacheProtos.BlockType toPB(BlockType blockType) { + switch(blockType) { + case DATA: + return BucketCacheProtos.BlockType.data; + case META: + return BucketCacheProtos.BlockType.meta; + case TRAILER: + return BucketCacheProtos.BlockType.trailer; + case INDEX_V1: + return BucketCacheProtos.BlockType.index_v1; + case FILE_INFO: + return BucketCacheProtos.BlockType.file_info; + case LEAF_INDEX: + return BucketCacheProtos.BlockType.leaf_index; + case ROOT_INDEX: + return BucketCacheProtos.BlockType.root_index; + case BLOOM_CHUNK: + return BucketCacheProtos.BlockType.bloom_chunk; + case ENCODED_DATA: + return BucketCacheProtos.BlockType.encoded_data; + case GENERAL_BLOOM_META: + return BucketCacheProtos.BlockType.general_bloom_meta; + case INTERMEDIATE_INDEX: + return BucketCacheProtos.BlockType.intermediate_index; + case DELETE_FAMILY_BLOOM_META: + return BucketCacheProtos.BlockType.delete_family_bloom_meta; + default: + throw new Error("Unrecognized BlockType."); + } + } + + private static BucketCacheProtos.BucketEntry toPB(BucketCache.BucketEntry entry) { + return BucketCacheProtos.BucketEntry.newBuilder() + .setOffset(entry.offset()) + .setLength(entry.getLength()) + .setDeserialiserIndex(entry.deserialiserIndex) + .setAccessCounter(entry.getAccessCounter()) + .setPriority(toPB(entry.getPriority())) + .build(); + } + + private static BucketCacheProtos.BlockPriority toPB(BlockPriority p) { + switch (p) { + case MULTI: + return BucketCacheProtos.BlockPriority.multi; + case MEMORY: + return BucketCacheProtos.BlockPriority.memory; + case SINGLE: + return BucketCacheProtos.BlockPriority.single; + default: + throw new Error("Unrecognized BlockPriority."); + } + } + + static ConcurrentHashMap fromPB( + Map deserializers, BucketCacheProtos.BackingMap backingMap) + throws IOException { + ConcurrentHashMap result = new ConcurrentHashMap<>(); + for (BucketCacheProtos.BackingMapEntry entry : backingMap.getEntryList()) { + BucketCacheProtos.BlockCacheKey protoKey = entry.getKey(); + BlockCacheKey key = new BlockCacheKey(protoKey.getHfilename(), protoKey.getOffset(), + protoKey.getIsPrimaryReplicaBlock(), fromPb(protoKey.getBlockType())); + BucketCacheProtos.BucketEntry protoValue = entry.getValue(); + BucketCache.BucketEntry value = new BucketCache.BucketEntry( + protoValue.getOffset(), + protoValue.getLength(), + protoValue.getAccessCounter(), + protoValue.getPriority() == BucketCacheProtos.BlockPriority.memory); + // This is the deserializer that we stored + int oldIndex = protoValue.getDeserialiserIndex(); + String deserializerClass = deserializers.get(oldIndex); + if (deserializerClass == null) { + throw new IOException("Found deserializer index without matching entry."); + } + // Convert it to the identifier for the deserializer that we have in this runtime + if (deserializerClass.equals(HFileBlock.BlockDeserializer.class.getName())) { + int actualIndex = HFileBlock.BLOCK_DESERIALIZER.getDeserialiserIdentifier(); + value.deserialiserIndex = (byte) actualIndex; + } else { + // We could make this more plugable, but right now HFileBlock is the only implementation + // of Cacheable outside of tests, so this might not ever matter. + throw new IOException("Unknown deserializer class."); + } + result.put(key, value); + } + return result; + } + + private static BlockType fromPb(BucketCacheProtos.BlockType blockType) { + switch (blockType) { + case data: + return BlockType.DATA; + case meta: + return BlockType.META; + case trailer: + return BlockType.TRAILER; + case index_v1: + return BlockType.INDEX_V1; + case file_info: + return BlockType.FILE_INFO; + case leaf_index: + return BlockType.LEAF_INDEX; + case root_index: + return BlockType.ROOT_INDEX; + case bloom_chunk: + return BlockType.BLOOM_CHUNK; + case encoded_data: + return BlockType.ENCODED_DATA; + case general_bloom_meta: + return BlockType.GENERAL_BLOOM_META; + case intermediate_index: + return BlockType.INTERMEDIATE_INDEX; + case delete_family_bloom_meta: + return BlockType.DELETE_FAMILY_BLOOM_META; + default: + throw new Error("Unrecognized BlockType."); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java deleted file mode 100644 index ec297a5a66..0000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.hfile.bucket; - -import java.io.Serializable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Map from type T to int and vice-versa. Used for reducing bit field item - * counts. - */ -@InterfaceAudience.Private -public final class UniqueIndexMap implements Serializable { - private static final long serialVersionUID = -1145635738654002342L; - - ConcurrentHashMap mForwardMap = new ConcurrentHashMap<>(); - ConcurrentHashMap mReverseMap = new ConcurrentHashMap<>(); - AtomicInteger mIndex = new AtomicInteger(0); - - // Map a length to an index. If we can't, allocate a new mapping. We might - // race here and get two entries with the same deserialiser. This is fine. - int map(T parameter) { - Integer ret = mForwardMap.get(parameter); - if (ret != null) return ret.intValue(); - int nexti = mIndex.incrementAndGet(); - assert (nexti < Short.MAX_VALUE); - mForwardMap.put(parameter, nexti); - mReverseMap.put(nexti, parameter); - return nexti; - } - - T unmap(int leni) { - Integer len = Integer.valueOf(leni); - assert mReverseMap.containsKey(len); - return mReverseMap.get(len); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index af28912d0e..3c4ae78577 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -287,6 +287,7 @@ public class CacheTestUtils { return deserializerIdentifier; } + @Override public Cacheable deserialize(ByteBuff b, boolean reuse, MemoryType memType) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 924dd02768..f4e4e53aa2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.io.hfile.bucket; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.ByteBuffer; @@ -246,11 +248,13 @@ public class TestBucketCache { Path testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); - BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir - + "/bucket.persistence"); + String ioEngineName = "file:" + testDir + "/bucket.cache"; + String persistencePath = testDir + "/bucket.persistence"; + + BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); long usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize == 0); + assertEquals(0, usedSize); HFileBlockPair[] blocks = CacheTestUtils.generateHFileBlocks(constructedBlockSize, 1); // Add blocks @@ -261,24 +265,26 @@ public class TestBucketCache { cacheAndWaitUntilFlushedToBucket(bucketCache, block.getBlockName(), block.getBlock()); } usedSize = bucketCache.getAllocator().getUsedSize(); - assertTrue(usedSize != 0); + assertNotEquals(0, usedSize); // persist cache to file bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); // restore cache from file - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir - + "/bucket.persistence"); + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + constructedBlockSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); // persist cache to file bucketCache.shutdown(); + assertTrue(new File(persistencePath).exists()); // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) // so it can't restore cache from file int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, - constructedBlockSize, smallBucketSizes, writeThreads, - writerQLen, testDir + "/bucket.persistence"); + bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, + smallBucketSizes, writeThreads, writerQLen, persistencePath); + assertFalse(new File(persistencePath).exists()); assertEquals(0, bucketCache.getAllocator().getUsedSize()); assertEquals(0, bucketCache.backingMap.size()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index a694fcba16..f3ad135f3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -143,8 +143,7 @@ public class TestBucketWriterThread { RAMQueueEntry rqe = q.remove(); RAMQueueEntry spiedRqe = Mockito.spy(rqe); Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). - writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), - (UniqueIndexMap)Mockito.any(), (LongAdder) Mockito.any()); + writeToCache(Mockito.any(), Mockito.any(), Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); // Cache disabled when ioes w/o ever healing. @@ -166,8 +165,7 @@ public class TestBucketWriterThread { BucketEntry mockedBucketEntry = Mockito.mock(BucketEntry.class); Mockito.doThrow(cfe). doReturn(mockedBucketEntry). - when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), - (UniqueIndexMap)Mockito.any(), (LongAdder) Mockito.any()); + when(spiedRqe).writeToCache(Mockito.any(), Mockito.any(), Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); } -- 2.16.1