From 74c64f7487a53a8a4b5b1542207c89b447c7ef66 Mon Sep 17 00:00:00 2001 From: Jurriaan Mous Date: Mon, 23 May 2016 11:23:12 +0200 Subject: [PATCH] HBASE-15876 Remove doBulkLoad(Path hfofDir, final HTable table) though it has not been through a full deprecation cycle --- .../hbase/mapreduce/LoadIncrementalHFiles.java | 124 ++++++++++----------- .../mob/compactions/PartitionedMobCompactor.java | 110 +++++++++--------- 2 files changed, 116 insertions(+), 118 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 0084878..5781a42 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -25,6 +25,32 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang.mutable.MutableInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,7 +71,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; @@ -76,32 +101,6 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - /** * Tool to load the output of HFileOutputFormat into an existing table. * @see #usage() @@ -165,7 +164,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + "\n"); } - private static interface BulkHFileVisitor { + private interface BulkHFileVisitor { TFamily bulkFamily(final byte[] familyName) throws IOException; void bulkHFile(final TFamily family, final FileStatus hfileStatus) @@ -308,25 +307,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * pre-existing table. This method is not threadsafe. * * @param hfofDir the directory that was provided as the output path - * of a job using HFileOutputFormat - * @param table the table to load into - * @throws TableNotFoundException if table does not yet exist - */ - @SuppressWarnings("deprecation") - public void doBulkLoad(Path hfofDir, final HTable table) - throws TableNotFoundException, IOException { - try (Admin admin = table.getConnection().getAdmin(); - RegionLocator rl = table.getRegionLocator()) { - doBulkLoad(hfofDir, admin, table, rl); - } - } - - /** - * Perform a bulk load of the given directory into the given - * pre-existing table. This method is not threadsafe. - * - * @param hfofDir the directory that was provided as the output path - * of a job using HFileOutputFormat + * of a job using HFileOutputFormat * @param table the table to load into * @throws TableNotFoundException if table does not yet exist */ @@ -341,7 +322,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread - Deque queue = new LinkedList(); + Deque queue = new LinkedList<>(); try { /* * Checking hfile format is a time-consuming operation, we should have an option to skip @@ -426,8 +407,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } if (queue != null && !queue.isEmpty()) { - throw new RuntimeException("Bulk load aborted with some files not yet loaded." - + "Please check log for more details."); + throw new RuntimeException("Bulk load aborted with some files not yet loaded." + + "Please check log for more details."); } } @@ -563,9 +544,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private boolean checkHFilesCountPerRegionPerFamily( final Multimap regionGroups) { for (Entry> e: regionGroups.asMap().entrySet()) { + ? extends Collection> e: regionGroups.asMap().entrySet()) { final Collection lqis = e.getValue(); - HashMap filesMap = new HashMap(); + HashMap filesMap = new HashMap<>(); for (LoadQueueItem lqi: lqis) { MutableInt count = filesMap.get(lqi.family); if (count == null) { @@ -597,7 +578,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { final Multimap regionGroups = Multimaps.synchronizedMultimap(rgs); // drain LQIs and figure out bulk load groups - Set>> splittingFutures = new HashSet>>(); + Set>> splittingFutures = new HashSet<>(); while (!queue.isEmpty()) { final LoadQueueItem item = queue.remove(); @@ -650,7 +631,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + - "region. Splitting..."); + "region. Splitting..."); String uniqueName = getUniqueName(); HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family); @@ -692,7 +673,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * LQI's corresponding to the resultant hfiles. * * protected for testing - * @throws IOException + * @throws IOException if an IO failure is encountered */ protected List groupOrSplit(Multimap regionGroups, final LoadQueueItem item, final Table table, @@ -786,13 +767,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * Protected for testing. * * @return empty list if success, list of items to retry on recoverable - * failure + * failure */ protected List tryAtomicRegionLoad(final Connection conn, final TableName tableName, final byte[] first, final Collection lqis) throws IOException { final List> famPaths = - new ArrayList>(lqis.size()); + new ArrayList<>(lqis.size()); for (LoadQueueItem lqi : lqis) { famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); } @@ -857,7 +838,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { }; try { - List toRetry = new ArrayList(); + List toRetry = new ArrayList<>(); Configuration conf = getConf(); boolean success = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() @@ -890,8 +871,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { static void splitStoreFile( Configuration conf, Path inFile, HColumnDescriptor familyDesc, byte[] splitKey, - Path bottomOut, Path topOut) throws IOException - { + Path bottomOut, Path topOut) throws IOException { // Open reader with no block cache, and not in-memory Reference topReference = Reference.createTopReference(splitKey); Reference bottomReference = Reference.createBottomReference(splitKey); @@ -944,8 +924,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } } finally { - if (halfWriter != null) halfWriter.close(); - if (halfReader != null) halfReader.close(cacheConf.shouldEvictOnClose()); + if (halfWriter != null) { + halfWriter.close(); + } + if (halfReader != null) { + halfReader.close(cacheConf.shouldEvictOnClose()); + } } } @@ -972,16 +956,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * 2) Return the boundary list. */ public static byte[][] inferBoundaries(TreeMap bdryMap) { - ArrayList keysArray = new ArrayList(); + ArrayList keysArray = new ArrayList<>(); int runningValue = 0; byte[] currStartKey = null; boolean firstBoundary = true; for (Map.Entry item: bdryMap.entrySet()) { - if (runningValue == 0) currStartKey = item.getKey(); + if (runningValue == 0) { + currStartKey = item.getKey(); + } runningValue += item.getValue(); if (runningValue == 0) { - if (!firstBoundary) keysArray.add(currStartKey); + if (!firstBoundary) { + keysArray.add(currStartKey); + } firstBoundary = false; } } @@ -1000,7 +988,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // Add column families // Build a set of keys final HTableDescriptor htd = new HTableDescriptor(tableName); - final TreeMap map = new TreeMap(Bytes.BYTES_COMPARATOR); + final TreeMap map = new TreeMap<>(Bytes.BYTES_COMPARATOR); visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor() { @Override public HColumnDescriptor bulkFamily(final byte[] familyName) { @@ -1073,8 +1061,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { Path hfofDir = new Path(dirPath); try (Table table = connection.getTable(tableName); - RegionLocator locator = connection.getRegionLocator(tableName)) { - doBulkLoad(hfofDir, admin, table, locator); + RegionLocator locator = connection.getRegionLocator(tableName)) { + doBulkLoad(hfofDir, admin, table, locator); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index cfe76ae..b4d4bab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.TagType; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.HFileLink; @@ -90,10 +89,10 @@ public class PartitionedMobCompactor extends MobCompactor { protected int compactionBatchSize; protected int compactionKVMax; - private Path tempPath; - private Path bulkloadPath; - private CacheConfig compactionCacheConfig; - private Tag tableNameTag; + private final Path tempPath; + private final Path bulkloadPath; + private final CacheConfig compactionCacheConfig; + private final Tag tableNameTag; private Encryption.Context cryptoContext = Encryption.Context.NONE; public PartitionedMobCompactor(Configuration conf, FileSystem fs, TableName tableName, @@ -137,13 +136,12 @@ public class PartitionedMobCompactor extends MobCompactor { * @param candidates All the candidates. * @param allFiles Whether add all mob files into the compaction. * @return A compaction request. - * @throws IOException + * @throws IOException if IO failure is encountered */ protected PartitionedMobCompactionRequest select(List candidates, boolean allFiles) throws IOException { - Collection allDelFiles = new ArrayList(); - Map filesToCompact = - new HashMap(); + Collection allDelFiles = new ArrayList<>(); + Map filesToCompact = new HashMap<>(); int selectedFileCount = 0; int irrelevantFileCount = 0; for (FileStatus file : candidates) { @@ -202,17 +200,17 @@ public class PartitionedMobCompactor extends MobCompactor { * * @param request The compaction request. * @return The paths of new mob files generated in the compaction. - * @throws IOException + * @throws IOException if IO failure is encountered */ protected List performCompaction(PartitionedMobCompactionRequest request) throws IOException { // merge the del files - List delFilePaths = new ArrayList(); + List delFilePaths = new ArrayList<>(); for (FileStatus delFile : request.delFiles) { delFilePaths.add(delFile.getPath()); } List newDelPaths = compactDelFiles(request, delFilePaths); - List newDelFiles = new ArrayList(); + List newDelFiles = new ArrayList<>(); List paths = null; try { for (Path newDelPath : newDelPaths) { @@ -247,7 +245,7 @@ public class PartitionedMobCompactor extends MobCompactor { * @param request The compaction request. * @param delFiles The del files. * @return The paths of new mob files after compactions. - * @throws IOException + * @throws IOException if IO failure is encountered */ protected List compactMobFiles(final PartitionedMobCompactionRequest request, final List delFiles) throws IOException { @@ -256,24 +254,23 @@ public class PartitionedMobCompactor extends MobCompactor { LOG.info("No partitions of mob files"); return Collections.emptyList(); } - List paths = new ArrayList(); - Connection c = ConnectionFactory.createConnection(conf); + List paths = new ArrayList<>(); + final Connection c = ConnectionFactory.createConnection(conf); final Table table = c.getTable(tableName); try { - Map>> results = - new HashMap>>(); + Map>> results = new HashMap<>(); // compact the mob files by partitions in parallel. for (final CompactionPartition partition : partitions) { results.put(partition.getPartitionId(), pool.submit(new Callable>() { @Override public List call() throws Exception { LOG.info("Compacting mob files for partition " + partition.getPartitionId()); - return compactMobFilePartition(request, partition, delFiles, table); + return compactMobFilePartition(request, partition, delFiles, c, table); } })); } // compact the partitions in parallel. - List failedPartitions = new ArrayList(); + List failedPartitions = new ArrayList<>(); for (Entry>> result : results.entrySet()) { try { paths.addAll(result.getValue().get()); @@ -291,7 +288,7 @@ public class PartitionedMobCompactor extends MobCompactor { try { table.close(); } catch (IOException e) { - LOG.error("Failed to close the HTable", e); + LOG.error("Failed to close the Table", e); } } return paths; @@ -302,13 +299,16 @@ public class PartitionedMobCompactor extends MobCompactor { * @param request The compaction request. * @param partition A compaction partition. * @param delFiles The del files. - * @param table The current table. - * @return The paths of new mob files after compactions. - * @throws IOException + * @param connection to use + * @param table The current table. @return The paths of new mob files after compactions. + * @throws IOException if IO failure is encountered */ private List compactMobFilePartition(PartitionedMobCompactionRequest request, - CompactionPartition partition, List delFiles, Table table) throws IOException { - List newFiles = new ArrayList(); + CompactionPartition partition, + List delFiles, + Connection connection, + Table table) throws IOException { + List newFiles = new ArrayList<>(); List files = partition.listFiles(); int offset = 0; Path bulkloadPathOfPartition = new Path(bulkloadPath, partition.getPartitionId().toString()); @@ -328,7 +328,7 @@ public class PartitionedMobCompactor extends MobCompactor { // clean the bulkload directory to avoid loading old files. fs.delete(bulkloadPathOfPartition, true); // add the selected mob files and del files into filesToCompact - List filesToCompact = new ArrayList(); + List filesToCompact = new ArrayList<>(); for (int i = offset; i < batch + offset; i++) { StoreFile sf = new StoreFile(fs, files.get(i).getPath(), conf, compactionCacheConfig, BloomType.NONE); @@ -336,7 +336,7 @@ public class PartitionedMobCompactor extends MobCompactor { } filesToCompact.addAll(delFiles); // compact the mob files in a batch. - compactMobFilesInBatch(request, partition, table, filesToCompact, batch, + compactMobFilesInBatch(request, partition, connection, table, filesToCompact, batch, bulkloadPathOfPartition, bulkloadColumnPath, newFiles); // move to the next batch. offset += batch; @@ -364,19 +364,23 @@ public class PartitionedMobCompactor extends MobCompactor { * Compacts a partition of selected small mob files and all the del files in a batch. * @param request The compaction request. * @param partition A compaction partition. + * @param connection To use for transport * @param table The current table. * @param filesToCompact The files to be compacted. * @param batch The number of mob files to be compacted in a batch. * @param bulkloadPathOfPartition The directory where the bulkload column of the current - * partition is saved. + * partition is saved. * @param bulkloadColumnPath The directory where the bulkload files of current partition - * are saved. + * are saved. * @param newFiles The paths of new mob files after compactions. - * @throws IOException + * @throws IOException if IO failure is encountered */ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, - CompactionPartition partition, Table table, List filesToCompact, int batch, - Path bulkloadPathOfPartition, Path bulkloadColumnPath, List newFiles) + CompactionPartition partition, + Connection connection, Table table, + List filesToCompact, int batch, + Path bulkloadPathOfPartition, Path bulkloadColumnPath, + List newFiles) throws IOException { // open scanner to the selected mob files and del files. StoreScanner scanner = createScanner(filesToCompact, ScanType.COMPACT_DROP_DELETES); @@ -400,8 +404,8 @@ public class PartitionedMobCompactor extends MobCompactor { refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, fileInfo .getSecond().longValue(), compactionCacheConfig, cryptoContext); refFilePath = refFileWriter.getPath(); - List cells = new ArrayList(); - boolean hasMore = false; + List cells = new ArrayList<>(); + boolean hasMore; ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); do { @@ -428,7 +432,7 @@ public class PartitionedMobCompactor extends MobCompactor { // commit mob file MobUtils.commitFile(conf, fs, filePath, mobFamilyDir, compactionCacheConfig); // bulkload the ref file - bulkloadRefFile(table, bulkloadPathOfPartition, filePath.getName()); + bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName()); newFiles.add(new Path(mobFamilyDir, filePath.getName())); } else { // remove the new files @@ -450,10 +454,10 @@ public class PartitionedMobCompactor extends MobCompactor { /** * Compacts the del files in batches which avoids opening too many files. * @param request The compaction request. - * @param delFilePaths + * @param delFilePaths Del file paths to compact * @return The paths of new del files after merging or the original files if no merging * is necessary. - * @throws IOException + * @throws IOException if IO failure is encountered */ protected List compactDelFiles(PartitionedMobCompactionRequest request, List delFilePaths) throws IOException { @@ -462,14 +466,14 @@ public class PartitionedMobCompactor extends MobCompactor { } // when there are more del files than the number that is allowed, merge it firstly. int offset = 0; - List paths = new ArrayList(); + List paths = new ArrayList<>(); while (offset < delFilePaths.size()) { // get the batch int batch = compactionBatchSize; if (delFilePaths.size() - offset < compactionBatchSize) { batch = delFilePaths.size() - offset; } - List batchedDelFiles = new ArrayList(); + List batchedDelFiles = new ArrayList<>(); if (batch == 1) { // only one file left, do not compact it, directly add it to the new files. paths.add(delFilePaths.get(offset)); @@ -493,7 +497,7 @@ public class PartitionedMobCompactor extends MobCompactor { * @param request The compaction request. * @param delFiles The del files. * @return The path of new del file after merging. - * @throws IOException + * @throws IOException if IO failure is encountered */ private Path compactDelFilesInBatch(PartitionedMobCompactionRequest request, List delFiles) throws IOException { @@ -507,8 +511,8 @@ public class PartitionedMobCompactor extends MobCompactor { column.getCompactionCompression(), HConstants.EMPTY_START_ROW, compactionCacheConfig, cryptoContext); filePath = writer.getPath(); - List cells = new ArrayList(); - boolean hasMore = false; + List cells = new ArrayList<>(); + boolean hasMore; ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); do { @@ -544,7 +548,7 @@ public class PartitionedMobCompactor extends MobCompactor { * @param filesToCompact The files to be compacted. * @param scanType The scan type. * @return The store scanner. - * @throws IOException + * @throws IOException if IO failure is encountered */ private StoreScanner createScanner(List filesToCompact, ScanType scanType) throws IOException { @@ -561,17 +565,23 @@ public class PartitionedMobCompactor extends MobCompactor { /** * Bulkloads the current file. + * + * @param connection to use to get admin/RegionLocator * @param table The current table. * @param bulkloadDirectory The path of bulkload directory. * @param fileName The current file name. - * @throws IOException + * @throws IOException if IO failure is encountered */ - private void bulkloadRefFile(Table table, Path bulkloadDirectory, String fileName) + private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory, + String fileName) throws IOException { // bulkload the ref file try { LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf); - bulkload.doBulkLoad(bulkloadDirectory, (HTable)table); + bulkload.doBulkLoad(bulkloadDirectory, + connection.getAdmin(), + table, + connection.getRegionLocator(table.getName())); } catch (Exception e) { // delete the committed mob file deletePath(new Path(mobFamilyDir, fileName)); @@ -587,7 +597,7 @@ public class PartitionedMobCompactor extends MobCompactor { * @param writer The mob file writer. * @param maxSeqId Maximum sequence id. * @param mobCellsCount The number of mob cells. - * @throws IOException + * @throws IOException if IO failure is encountered */ private void closeMobFileWriter(StoreFileWriter writer, long maxSeqId, long mobCellsCount) throws IOException { @@ -606,7 +616,7 @@ public class PartitionedMobCompactor extends MobCompactor { * @param writer The ref file writer. * @param maxSeqId Maximum sequence id. * @param bulkloadTime The timestamp at which the bulk load file is created. - * @throws IOException + * @throws IOException if IO failure is encountered */ private void closeRefFileWriter(StoreFileWriter writer, long maxSeqId, long bulkloadTime) throws IOException { @@ -626,7 +636,7 @@ public class PartitionedMobCompactor extends MobCompactor { * Gets the max seqId and number of cells of the store files. * @param storeFiles The store files. * @return The pair of the max seqId and number of cells of the store files. - * @throws IOException + * @throws IOException if IO failure is encountered */ private Pair getFileInfo(List storeFiles) throws IOException { long maxSeqId = 0; @@ -639,7 +649,7 @@ public class PartitionedMobCompactor extends MobCompactor { maxKeyCount += Bytes.toLong(count); } } - return new Pair(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); + return new Pair<>(Long.valueOf(maxSeqId), Long.valueOf(maxKeyCount)); } /** -- 2.5.0