diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 31b2a15..e469cbf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5470,14 +5470,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, + public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); } @Override - public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { + public Map> bulkLoadHFiles(Collection> familyPaths, + boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap(); @@ -5532,7 +5532,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // problem when validating LOG.warn("There was a recoverable bulk load failure likely due to a" + " split. These (family, HFile) pairs were not loaded: " + list); - return isSuccessful; + return null; } // We need to assign a sequential ID that's in between two memstores in order to preserve @@ -5626,7 +5626,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi closeBulkRegionOperation(); } - return isSuccessful; + return isSuccessful ? storeFiles : null; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index cd62115..4dd34dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.LongAdder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ByteBufferedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScannable; @@ -2142,6 +2143,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, Region region = getRegion(request.getRegion()); boolean bypass = false; boolean loaded = false; + Map> map = null; if (!request.hasBulkToken()) { // Old style bulk load. This will not be supported in future releases @@ -2155,15 +2157,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, + map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, request.getCopyFile()); + if (map != null) { + loaded = true; + } } if (region.getCoprocessorHost() != null) { loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); } } else { // secure bulk load - loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); + map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(loaded); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 1b106b2..09016e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -558,11 +559,11 @@ public interface Region extends ConfigurationObserver { * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param assignSeqId - * @return true if successful, false if failed recoverably + * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ - boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener) throws IOException; + Map> bulkLoadHFiles(Collection> familyPaths, + boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException; /** * Attempts to atomically load a group of hfiles. This is critical for loading @@ -573,11 +574,11 @@ public interface Region extends ConfigurationObserver { * @param bulkLoadListener Internal hooks enabling massaging/preparation of a * file about to be bulk loaded * @param copyFile always copy hfiles if true - * @return true if successful, false if failed recoverably + * @return Map from family to List of store file paths if successful, null if failed recoverably * @throws IOException if failed unrecoverably. */ - boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, - BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException; + Map> bulkLoadHFiles(Collection> familyPaths, + boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException; /////////////////////////////////////////////////////////////////////////// // Coprocessors diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index e84ca40..2f0798f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -171,7 +171,7 @@ public class SecureBulkLoadManager { fs.delete(new Path(request.getBulkToken()), true); } - public boolean secureBulkLoadHFiles(final Region region, + public Map> secureBulkLoadHFiles(final Region region, final BulkLoadHFileRequest request) throws IOException { final List> familyPaths = new ArrayList>(request.getFamilyPathCount()); for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { @@ -200,6 +200,8 @@ public class SecureBulkLoadManager { bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } boolean loaded = false; + Map> map = null; + if (!bypass) { // Get the target fs (HBase region server fs) delegation token // Since we have checked the permission via 'preBulkLoadHFile', now let's give @@ -217,9 +219,9 @@ public class SecureBulkLoadManager { } } - loaded = ugi.doAs(new PrivilegedAction() { + map = ugi.doAs(new PrivilegedAction>>() { @Override - public Boolean run() { + public Map> run() { FileSystem fs = null; try { fs = FileSystem.get(conf); @@ -237,14 +239,14 @@ public class SecureBulkLoadManager { } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } - return false; + return null; } }); } if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); } - return loaded; + return map; } private List getBulkLoadObservers(Region region) {