diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java index f2c686d..7f1fb1d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java @@ -20,11 +20,15 @@ package org.apache.hadoop.hbase.snapshot; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -497,7 +501,7 @@ public final class SnapshotManifest { // Write the v2 Data Manifest. // Once the data-manifest is written, the snapshot can be considered complete. // Currently snapshots are written in a "temporary" directory and later - // moved to the "complated" snapshot directory. + // moved to the "completed" snapshot directory. SnapshotDataManifest dataManifest = dataManifestBuilder.build(); writeDataManifest(dataManifest); this.regionManifests = dataManifest.getRegionManifestsList(); @@ -507,14 +511,58 @@ public final class SnapshotManifest { // The extra files in the snapshot directory will not give any problem, // since they have the same content as the data manifest, and even by re-reading // them we will get the same information. - if (v1Regions != null && v1Regions.size() > 0) { - for (SnapshotRegionManifest regionManifest: v1Regions) { - SnapshotManifestV1.deleteRegionManifest(fs, workingDir, regionManifest); - } - } - if (v2Regions != null && v2Regions.size() > 0) { - for (SnapshotRegionManifest regionManifest: v2Regions) { - SnapshotManifestV2.deleteRegionManifest(fs, workingDir, regionManifest); + // Do deletion in a thread pool, which can speed up tremendously this section of taking a snapshot + // for filesystems remote from hdfs, like S3 in AWS + if (v1Regions.size() > 0 || v2Regions.size() > 0) { + ThreadPoolExecutor tpoolDelete = createExecutor("SnapshotRegionManifestDeletePool"); + try { + final ExecutorCompletionService completionService = + new ExecutorCompletionService<>(tpoolDelete); + if (v1Regions != null && v1Regions.size() > 0) { + for (SnapshotRegionManifest regionManifest : v1Regions) { + completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + SnapshotManifestV1.deleteRegionManifest(fs, workingDir, regionManifest); + return null; + } + } + ); + } + } + + if (v2Regions != null && v2Regions.size() > 0) { + for (SnapshotRegionManifest regionManifest : v2Regions) { + completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + SnapshotManifestV2.deleteRegionManifest(fs, workingDir, regionManifest); + return null; + } + } + ); + } + } + + // get each completed task, which returns nothing, but this ensures that the deletion is + // complete before we shutdown the thread pool + for (int ix = 0; ix < v1Regions.size() + v2Regions.size(); ix++) { + completionService.take().get(); + } + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + + if (t instanceof InvalidProtocolBufferException) { + throw (InvalidProtocolBufferException) t; + } else { + IOException ex = new IOException("ExecutionException"); + ex.initCause(e.getCause()); + throw ex; + } + } finally { + tpoolDelete.shutdown(); } } }