From 13ca4de23ef9e16d08be4d8c81f61d6bf8444c2d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 28 Jul 2018 14:06:57 -0700 Subject: [PATCH] Write the CacheableDeserializerIdManager index into BucketEntries instead of the int we get via UniqueIndexMap; saves on a cryptic indirection. Purge UniqueIndexMap. TODO: Finish up the serializing of CacheableDeserializerIdManager in BucketCache --- .../io/hfile/CacheableDeserializerIdManager.java | 22 +++++---- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 38 +++++++-------- .../hbase/io/hfile/bucket/UniqueIndexMap.java | 56 ---------------------- .../io/hfile/bucket/TestBucketWriterThread.java | 4 +- 4 files changed, 32 insertions(+), 88 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java index b1ed77dfac..05329ebc63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheableDeserializerIdManager.java @@ -25,19 +25,24 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.yetus.audience.InterfaceAudience; /** - * This class is used to manage the identifiers for - * {@link CacheableDeserializer} + * This class is used to manage the identifiers for {@link CacheableDeserializer}. + * All deserializers are registered with this Manager via the + * {@link #registerDeserializer(CacheableDeserializer)}}. On registration, we return an + * int *identifier* for this deserializer. The int identifier is passed to + * {@link #getDeserializer(int)}} to obtain the registered deserializer instance. */ @InterfaceAudience.Private public class CacheableDeserializerIdManager { - private static final Map> registeredDeserializers = new HashMap<>(); + private static final Map> registeredDeserializers = + new HashMap<>(); private static final AtomicInteger identifier = new AtomicInteger(0); /** - * Register the given cacheable deserializer and generate an unique identifier - * id for it - * @param cd + * Register the given {@link Cacheable} -- usually an hfileblock instance, these implement + * the Cacheable Interface -- deserializer and generate an unique identifier id for it and return + * this as our result. * @return the identifier of given cacheable deserializer + * @see #getDeserializer(int) */ public static int registerDeserializer(CacheableDeserializer cd) { int idx = identifier.incrementAndGet(); @@ -48,9 +53,8 @@ public class CacheableDeserializerIdManager { } /** - * Get the cacheable deserializer as the given identifier Id - * @param id - * @return CacheableDeserializer + * Get the cacheable deserializer registered at the given identifier Id. + * @see #registerDeserializer(CacheableDeserializer) */ public static CacheableDeserializer getDeserializer(int id) { return registeredDeserializers.get(id); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 40c0a00cb9..49893d74b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -164,8 +164,6 @@ public class BucketCache implements BlockCache, HeapSize { private volatile boolean freeInProgress = false; private final Lock freeSpaceLock = new ReentrantLock(); - private UniqueIndexMap deserialiserMap = new UniqueIndexMap<>(); - private final LongAdder realCacheSize = new LongAdder(); private final LongAdder heapSize = new LongAdder(); /** Current number of cached elements */ @@ -510,8 +508,8 @@ public class BucketCache implements BlockCache, HeapSize { if (LOG.isTraceEnabled()) { LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); } - Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, - bucketEntry.deserializerReference(this.deserialiserMap)); + Cacheable cachedBlock = + ioEngine.read(bucketEntry.offset(), len, bucketEntry.deserializerReference()); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); @@ -987,8 +985,7 @@ public class BucketCache implements BlockCache, HeapSize { index++; continue; } - BucketEntry bucketEntry = - re.writeToCache(ioEngine, bucketAllocator, deserialiserMap, realCacheSize); + BucketEntry bucketEntry = re.writeToCache(ioEngine, bucketAllocator, realCacheSize); // Successfully added. Up index and add bucketEntry. Clear io exceptions. bucketEntries[index] = bucketEntry; if (ioErrorStartTime > 0) { @@ -1096,8 +1093,8 @@ public class BucketCache implements BlockCache, HeapSize { oos.writeLong(cacheCapacity); oos.writeUTF(ioEngine.getClass().getName()); oos.writeUTF(backingMap.getClass().getName()); - oos.writeObject(deserialiserMap); oos.writeObject(backingMap); + oos.writeObject(WRITE OUT THE MAP THAT IS IN CacheableDeserializerIdManager => The KEYS and the DESERIALIZER CLASS NAMES); } finally { if (oos != null) oos.close(); if (fos != null) fos.close(); @@ -1133,15 +1130,13 @@ public class BucketCache implements BlockCache, HeapSize { if (!backingMap.getClass().getName().equals(mapclass)) throw new IOException("Class name for cache map mismatch: " + mapclass + ", expected:" + backingMap.getClass().getName()); - UniqueIndexMap deserMap = (UniqueIndexMap) ois - .readObject(); ConcurrentHashMap backingMapFromFile = (ConcurrentHashMap) ois.readObject(); BucketAllocator allocator = new BucketAllocator(cacheCapacity, bucketSizes, backingMapFromFile, realCacheSize); bucketAllocator = allocator; - deserialiserMap = deserMap; backingMap = backingMapFromFile; + READ IN MAP OF DESERIALIZERS... Populate the CacheableDeserializerIdManager instance with what we've read. } finally { if (ois != null) ois.close(); if (fis != null) fis.close(); @@ -1298,7 +1293,14 @@ public class BucketCache implements BlockCache, HeapSize { private int offsetBase; private int length; private byte offset1; + + /** + * The index of the deserializer that can deserialize this BucketEntry content. + * See {@link CacheableDeserializerIdManager} for hosting of index to serializers. + */ byte deserialiserIndex; + + private volatile long accessCounter; private BlockPriority priority; @@ -1335,17 +1337,12 @@ public class BucketCache implements BlockCache, HeapSize { return length; } - protected CacheableDeserializer deserializerReference( - UniqueIndexMap deserialiserMap) { - return CacheableDeserializerIdManager.getDeserializer(deserialiserMap - .unmap(deserialiserIndex)); + protected CacheableDeserializer deserializerReference() { + return CacheableDeserializerIdManager.getDeserializer(deserialiserIndex); } - protected void setDeserialiserReference( - CacheableDeserializer deserializer, - UniqueIndexMap deserialiserMap) { - this.deserialiserIndex = ((byte) deserialiserMap.map(deserializer - .getDeserialiserIdentifier())); + protected void setDeserialiserReference(CacheableDeserializer deserializer) { + this.deserialiserIndex = (byte)deserializer.getDeserialiserIdentifier(); } /** @@ -1504,7 +1501,6 @@ public class BucketCache implements BlockCache, HeapSize { public BucketEntry writeToCache(final IOEngine ioEngine, final BucketAllocator bucketAllocator, - final UniqueIndexMap deserialiserMap, final LongAdder realCacheSize) throws CacheFullException, IOException, BucketAllocatorException { int len = data.getSerializedLength(); @@ -1516,7 +1512,7 @@ public class BucketCache implements BlockCache, HeapSize { ? new UnsafeSharedMemoryBucketEntry(offset, len, accessCounter, inMemory) : new SharedMemoryBucketEntry(offset, len, accessCounter, inMemory) : new BucketEntry(offset, len, accessCounter, inMemory); - bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); + bucketEntry.setDeserialiserReference(data.getDeserializer()); try { if (data instanceof HFileBlock) { // If an instance of HFileBlock, save on some allocations. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java deleted file mode 100644 index ec297a5a66..0000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/UniqueIndexMap.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.io.hfile.bucket; - -import java.io.Serializable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * Map from type T to int and vice-versa. Used for reducing bit field item - * counts. - */ -@InterfaceAudience.Private -public final class UniqueIndexMap implements Serializable { - private static final long serialVersionUID = -1145635738654002342L; - - ConcurrentHashMap mForwardMap = new ConcurrentHashMap<>(); - ConcurrentHashMap mReverseMap = new ConcurrentHashMap<>(); - AtomicInteger mIndex = new AtomicInteger(0); - - // Map a length to an index. If we can't, allocate a new mapping. We might - // race here and get two entries with the same deserialiser. This is fine. - int map(T parameter) { - Integer ret = mForwardMap.get(parameter); - if (ret != null) return ret.intValue(); - int nexti = mIndex.incrementAndGet(); - assert (nexti < Short.MAX_VALUE); - mForwardMap.put(parameter, nexti); - mReverseMap.put(nexti, parameter); - return nexti; - } - - T unmap(int leni) { - Integer len = Integer.valueOf(leni); - assert mReverseMap.containsKey(len); - return mReverseMap.get(len); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index a694fcba16..00c8c0813f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -144,7 +144,7 @@ public class TestBucketWriterThread { RAMQueueEntry spiedRqe = Mockito.spy(rqe); Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), - (UniqueIndexMap)Mockito.any(), (LongAdder) Mockito.any()); + (LongAdder) Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); // Cache disabled when ioes w/o ever healing. @@ -167,7 +167,7 @@ public class TestBucketWriterThread { Mockito.doThrow(cfe). doReturn(mockedBucketEntry). when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), - (UniqueIndexMap)Mockito.any(), (LongAdder) Mockito.any()); + (LongAdder) Mockito.any()); this.q.add(spiedRqe); doDrainOfOneEntry(bc, wt, q); } -- 2.16.3