From 0b462d3fb285ebd56bf8dc293ee7d45ed70f4351 Mon Sep 17 00:00:00 2001 From: momo <123> Date: Wed, 21 Aug 2019 23:36:31 +0800 Subject: [PATCH] BucketCache bug fix. --- .../hbase/io/hfile/bucket/BucketCache.java | 166 +++++++++++++++--- 1 file changed, 146 insertions(+), 20 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index c5a1b21c05..ffe6fe671c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -29,6 +29,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; @@ -72,6 +74,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; +import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -127,7 +130,7 @@ public class BucketCache implements BlockCache, HeapSize { final static int DEFAULT_WRITER_QUEUE_ITEMS = 64; // Store/read block data - final IOEngine ioEngine; + private IOEngine ioEngine; // Store the block in this map before writing it to cache @VisibleForTesting @@ -242,6 +245,12 @@ public class BucketCache implements BlockCache, HeapSize { /** In-memory bucket size */ private float memoryFactor; + private String[] filePaths; + + static final String FILE_VERIFY_ALGORITHM = "hbase.bucketcache.file.verify.algorithm"; + static final String DEFAULT_FILE_VERIFY_ALGORITHM = "MD5"; + private String algorithmName; + public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath) throws FileNotFoundException, IOException { @@ -252,8 +261,7 @@ public class BucketCache implements BlockCache, HeapSize { public BucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) - throws FileNotFoundException, IOException { - this.ioEngine = getIOEngineFromName(ioEngineName, capacity); + throws IOException { this.writerThreads = new WriterThread[writerThreadNum]; long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { @@ -271,8 +279,8 @@ public class BucketCache implements BlockCache, HeapSize { sanityCheckConfigs(); LOG.info("Instantiating BucketCache with acceptableFactor: " + acceptableFactor + ", minFactor: " + minFactor + - ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + - ", memoryFactor: " + memoryFactor); + ", extraFreeFactor: " + extraFreeFactor + ", singleFactor: " + singleFactor + ", multiFactor: " + multiFactor + + ", memoryFactor: " + memoryFactor); this.cacheCapacity = capacity; this.persistencePath = persistencePath; @@ -289,16 +297,45 @@ public class BucketCache implements BlockCache, HeapSize { this.backingMap = new ConcurrentHashMap((int) blockNumCapacity); - if (ioEngine.isPersistent() && persistencePath != null) { - try { - retrieveFromFile(bucketSizes); - } catch (IOException ioex) { - LOG.error("Can't restore from file because of", ioex); - } catch (ClassNotFoundException cnfe) { - LOG.error("Can't restore from file in rebuild because can't deserialise",cnfe); - throw new RuntimeException(cnfe); + if (ioEngineName.startsWith("file")) { + this.filePaths = + ioEngineName.substring(ioEngineName.indexOf(":") + 1).split(FileIOEngine.FILE_DELIMITER); + this.algorithmName = conf.get(FILE_VERIFY_ALGORITHM,DEFAULT_FILE_VERIFY_ALGORITHM); + if (persistencePath != null){ + File persistenceFile = new File(persistencePath); + try { + String preFilesKey = getPreFilesKey(); + String filesKey = getFilesKey(); + LOG.info("preFilesKey: "+ preFilesKey + " filesKey: " + filesKey); + if (preFilesKey == null || !preFilesKey.equals(filesKey)){ + throw new IOException("File verification failed! Can't restore from file."); + } + ioEngine = getIOEngineFromName(ioEngineName, capacity); + retrieveFromFile(bucketSizes); + } catch (IOException ioex) { + LOG.error("Can't restore from file because of", ioex); + //delete cache files and backingMap persistence file. + if (ioEngine != null){ + ioEngine.shutdown(); + ioEngine = null; + } + deleteCacheDataFile(); + persistenceFile.delete(); + } catch (ClassNotFoundException cnfe) { + LOG.error("Can't restore from file in rebuild because can't deserialise", cnfe); + throw new RuntimeException(cnfe); + } catch (NoSuchAlgorithmException nsae) { + LOG.error("No such algorithm " + algorithmName, nsae); + throw new RuntimeException(nsae); + } + } else { + //not configure persistencePath + deleteCacheDataFile(); } } + if (ioEngine == null){ + ioEngine = getIOEngineFromName(ioEngineName, capacity); + } final String threadName = Thread.currentThread().getName(); this.cacheEnabled = true; for (int i = 0; i < writerThreads.length; ++i) { @@ -312,12 +349,96 @@ public class BucketCache implements BlockCache, HeapSize { // TODO: Add means of turning this off. Bit obnoxious running thread just to make a log // every five minutes. this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), - statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); LOG.info("Started bucket cache; ioengine=" + ioEngineName + - ", capacity=" + StringUtils.byteDesc(capacity) + - ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + - writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + - persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); + ", capacity=" + StringUtils.byteDesc(capacity) + + ", blockSize=" + StringUtils.byteDesc(blockSize) + ", writerThreadNum=" + + writerThreadNum + ", writerQLen=" + writerQLen + ", persistencePath=" + + persistencePath + ", bucketAllocator=" + this.bucketAllocator.getClass().getName()); + } + + /** + * When HBase is stoping, a key will be written to persistencePath + * @return the last key + * @throws IOException + */ + private String getPreFilesKey() throws IOException { + FileInputStream fis = null; + ObjectInputStream ois = null; + try { + fis = new FileInputStream(persistencePath); + ois = new ObjectInputStream(fis); + String result = ois.readUTF(); + return result; + } finally { + if (ois != null) ois.close(); + if (fis != null) fis.close(); + } + } + + /** + * delete bucketcache files + */ + private void deleteCacheDataFile(){ + if (filePaths == null) + return; + for (String file : filePaths) { + File file1 = new File(file); + file1.delete(); + } + } + + /** + * Using an encryption algorithm to get a key, the default encryption algorithm is MD5 + * @return the key which is convert to HexString + * @throws IOException + * @throws NoSuchAlgorithmException + */ + private String getFilesKey () throws IOException, NoSuchAlgorithmException { + if (filePaths == null) + return null; + StringBuffer sb = new StringBuffer(); + for (String filePath : filePaths){ + File file = new File(filePath); + if (file.exists()){ + sb.append((getFileSize(filePath)+""+file.lastModified())); + } else { + throw new IOException("Cache file " + filePath + " is not exists."); + } + } + MessageDigest messageDigest = MessageDigest.getInstance(algorithmName); + messageDigest.update(sb.toString().getBytes()); + return bytes2HexString(messageDigest.digest()); + } + + /** + * Using Linux command du to get file's real size + * @param filePath + * @return file's real size + * @throws IOException + */ + private long getFileSize(String filePath) throws IOException { + String[] commands = {"du",filePath}; + Shell.ShellCommandExecutor shell = new Shell.ShellCommandExecutor(commands); + shell.execute(); + return Long.parseLong(shell.getOutput().split("\t")[0]); + } + + /** + * ByteArray convert to HexString + * @param byteArray + * @return the HexSting + */ + private String bytes2HexString (byte[] byteArray) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < byteArray.length; i++) { + String hex = Integer.toHexString(byteArray[i] & 0xFF); + if (hex.length() < 2){ + sb.append(0); + } + sb.append(hex); + } + return sb.toString(); } private void sanityCheckConfigs() { @@ -1029,11 +1150,14 @@ public class BucketCache implements BlockCache, HeapSize { } fos = new FileOutputStream(persistencePath, false); oos = new ObjectOutputStream(fos); + oos.writeUTF(getFilesKey()); oos.writeLong(cacheCapacity); oos.writeUTF(ioEngine.getClass().getName()); oos.writeUTF(backingMap.getClass().getName()); oos.writeObject(deserialiserMap); oos.writeObject(backingMap); + } catch (NoSuchAlgorithmException e) { + LOG.error("No such algorithm : " + algorithmName + "! Failed to persist data on exit",e); } finally { if (oos != null) oos.close(); if (fos != null) fos.close(); @@ -1041,8 +1165,8 @@ public class BucketCache implements BlockCache, HeapSize { } @SuppressWarnings("unchecked") - private void retrieveFromFile(int[] bucketSizes) throws IOException, BucketAllocatorException, - ClassNotFoundException { + private void retrieveFromFile(int[] bucketSizes) throws IOException, + ClassNotFoundException { File persistenceFile = new File(persistencePath); if (!persistenceFile.exists()) { return; @@ -1056,6 +1180,7 @@ public class BucketCache implements BlockCache, HeapSize { "Attempt to restore non-persistent cache mappings!"); fis = new FileInputStream(persistencePath); ois = new ObjectInputStream(fis); + ois.readUTF(); long capacitySize = ois.readLong(); if (capacitySize != cacheCapacity) throw new IOException("Mismatched cache capacity:" @@ -1078,6 +1203,7 @@ public class BucketCache implements BlockCache, HeapSize { bucketAllocator = allocator; deserialiserMap = deserMap; backingMap = backingMapFromFile; + blockNumber.set(backingMap.size()); } finally { if (ois != null) ois.close(); if (fis != null) fis.close(); -- 2.17.2 (Apple Git-113)