diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 4e9e0b4..f0141df 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -21077,6 +21077,16 @@ public final class ClientProtos {
*/
com.google.protobuf.ByteString
getBulkTokenBytes();
+
+ // optional bool copy_file = 6 [default = false];
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ boolean hasCopyFile();
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ boolean getCopyFile();
}
/**
* Protobuf type {@code hbase.pb.BulkLoadHFileRequest}
@@ -21179,6 +21189,11 @@ public final class ClientProtos {
bulkToken_ = input.readBytes();
break;
}
+ case 48: {
+ bitField0_ |= 0x00000010;
+ copyFile_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -21979,12 +21994,29 @@ public final class ClientProtos {
}
}
+ // optional bool copy_file = 6 [default = false];
+ public static final int COPY_FILE_FIELD_NUMBER = 6;
+ private boolean copyFile_;
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ public boolean hasCopyFile() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ public boolean getCopyFile() {
+ return copyFile_;
+ }
+
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
familyPath_ = java.util.Collections.emptyList();
assignSeqNum_ = false;
fsToken_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken.getDefaultInstance();
bulkToken_ = "";
+ copyFile_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -22027,6 +22059,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(5, getBulkTokenBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBool(6, copyFile_);
+ }
getUnknownFields().writeTo(output);
}
@@ -22056,6 +22091,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(5, getBulkTokenBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(6, copyFile_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -22101,6 +22140,11 @@ public final class ClientProtos {
result = result && getBulkToken()
.equals(other.getBulkToken());
}
+ result = result && (hasCopyFile() == other.hasCopyFile());
+ if (hasCopyFile()) {
+ result = result && (getCopyFile()
+ == other.getCopyFile());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -22134,6 +22178,10 @@ public final class ClientProtos {
hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER;
hash = (53 * hash) + getBulkToken().hashCode();
}
+ if (hasCopyFile()) {
+ hash = (37 * hash) + COPY_FILE_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getCopyFile());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -22274,6 +22322,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000008);
bulkToken_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
+ copyFile_ = false;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -22335,6 +22385,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000008;
}
result.bulkToken_ = bulkToken_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.copyFile_ = copyFile_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -22391,6 +22445,9 @@ public final class ClientProtos {
bulkToken_ = other.bulkToken_;
onChanged();
}
+ if (other.hasCopyFile()) {
+ setCopyFile(other.getCopyFile());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -23013,6 +23070,39 @@ public final class ClientProtos {
return this;
}
+ // optional bool copy_file = 6 [default = false];
+ private boolean copyFile_ ;
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ public boolean hasCopyFile() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ public boolean getCopyFile() {
+ return copyFile_;
+ }
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ public Builder setCopyFile(boolean value) {
+ bitField0_ |= 0x00000020;
+ copyFile_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional bool copy_file = 6 [default = false];
+ */
+ public Builder clearCopyFile() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ copyFile_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest)
}
@@ -39320,81 +39410,81 @@ public final class ClientProtos {
"_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" +
"ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" +
"scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" +
- "s\"\206\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
+ "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
"(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" +
"path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" +
"st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" +
"\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" +
- "en\022\022\n\nbulk_token\030\005 \001(\t\032*\n\nFamilyPath\022\016\n\006" +
- "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF",
- "ileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegatio" +
- "nToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002" +
- " \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026P" +
- "repareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(" +
- "\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031" +
- ".hbase.pb.RegionSpecifier\"-\n\027PrepareBulk" +
- "LoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Clea" +
- "nupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)" +
- "\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecifie" +
- "r\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproces",
- "sorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_n" +
- "ame\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reques" +
- "t\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005v" +
- "alue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031C" +
- "oprocessorServiceRequest\022)\n\006region\030\001 \002(\013" +
- "2\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(" +
- "\0132 .hbase.pb.CoprocessorServiceCall\"o\n\032C" +
- "oprocessorServiceResponse\022)\n\006region\030\001 \002(" +
- "\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 " +
- "\002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022",
- "\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase" +
- ".pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.p" +
- "b.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.C" +
- "oprocessorServiceCall\"k\n\014RegionAction\022)\n" +
- "\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" +
- "\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" +
- ".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" +
- "eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" +
- "\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Multi" +
- "RegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.",
- "pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase" +
- ".pb.RegionLoadStats\"\336\001\n\021ResultOrExceptio" +
- "n\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase" +
- ".pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb" +
- ".NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"" +
- ".hbase.pb.CoprocessorServiceResult\0220\n\tlo" +
- "adStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats" +
- "B\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOrEx" +
- "ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" +
- "on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt",
- "esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" +
- " \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" +
- "up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" +
- "ondition\"\226\001\n\rMultiResponse\0228\n\022regionActi" +
- "onResult\030\001 \003(\0132\034.hbase.pb.RegionActionRe" +
- "sult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatist" +
- "ics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStat" +
- "s*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE" +
- "\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.G" +
- "etRequest\032\025.hbase.pb.GetResponse\022;\n\006Muta",
- "te\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.M" +
- "utateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReq" +
- "uest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoad" +
- "HFile\022\036.hbase.pb.BulkLoadHFileRequest\032\037." +
- "hbase.pb.BulkLoadHFileResponse\022V\n\017Prepar" +
- "eBulkLoad\022 .hbase.pb.PrepareBulkLoadRequ" +
- "est\032!.hbase.pb.PrepareBulkLoadResponse\022V" +
- "\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBulk" +
- "LoadRequest\032!.hbase.pb.CleanupBulkLoadRe" +
- "sponse\022X\n\013ExecService\022#.hbase.pb.Coproce",
- "ssorServiceRequest\032$.hbase.pb.Coprocesso" +
- "rServiceResponse\022d\n\027ExecRegionServerServ" +
- "ice\022#.hbase.pb.CoprocessorServiceRequest" +
- "\032$.hbase.pb.CoprocessorServiceResponse\0228" +
- "\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." +
- "pb.MultiResponseBB\n*org.apache.hadoop.hb" +
- "ase.protobuf.generatedB\014ClientProtosH\001\210\001" +
- "\001\240\001\001"
+ "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(" +
+ "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014",
+ "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" +
+ "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" +
+ "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" +
+ " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" +
+ "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" +
+ "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" +
+ "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" +
+ "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" +
+ "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013" +
+ "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu",
+ "lkLoadResponse\"a\n\026CoprocessorServiceCall" +
+ "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" +
+ "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" +
+ "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" +
+ "base.pb.NameBytesPair\"v\n\031CoprocessorServ" +
+ "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" +
+ "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" +
+ "oprocessorServiceCall\"o\n\032CoprocessorServ" +
+ "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" +
+ "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb",
+ ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" +
+ "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" +
+ "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" +
+ "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" +
+ "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" +
+ "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" +
+ "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" +
+ "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" +
+ "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction" +
+ "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat",
+ "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" +
+ "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" +
+ "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
+ "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" +
+ "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" +
+ "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" +
+ "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" +
+ "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" +
+ "ActionResult\0226\n\021resultOrException\030\001 \003(\0132" +
+ "\033.hbase.pb.ResultOrException\022*\n\texceptio",
+ "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" +
+ "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" +
+ "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" +
+ "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" +
+ "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" +
+ "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" +
+ "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" +
+ "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" +
+ "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS" +
+ "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb",
+ "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." +
+ "MutateRequest\032\030.hbase.pb.MutateResponse\022" +
+ "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" +
+ "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." +
+ "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" +
+ "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" +
+ "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" +
+ ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" +
+ "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!." +
+ "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec",
+ "Service\022#.hbase.pb.CoprocessorServiceReq" +
+ "uest\032$.hbase.pb.CoprocessorServiceRespon" +
+ "se\022d\n\027ExecRegionServerService\022#.hbase.pb" +
+ ".CoprocessorServiceRequest\032$.hbase.pb.Co" +
+ "processorServiceResponse\0228\n\005Multi\022\026.hbas" +
+ "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" +
+ "seBB\n*org.apache.hadoop.hbase.protobuf.g" +
+ "eneratedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -39502,7 +39592,7 @@ public final class ClientProtos {
internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor,
- new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", });
+ new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "CopyFile", });
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor =
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0);
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index adb66f7..6c0c00c 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -339,6 +339,7 @@ message BulkLoadHFileRequest {
optional bool assign_seq_num = 3;
optional DelegationToken fs_token = 4;
optional string bulk_token = 5;
+ optional bool copy_file = 6 [default = false];
message FamilyPath {
required bytes family = 1;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index debaec9..cf7ffc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5422,6 +5422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+ }
+
+ @Override
+ public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId,
+ BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
long seqId = -1;
Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR);
Map storeFilesSizes = new HashMap();
@@ -5505,7 +5511,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
- Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
+ Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId, copyFile);
// Note the size of the store file
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
index 508b4a7..9a9eb4c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java
@@ -447,10 +447,11 @@ public class HRegionFileSystem {
* @param familyName Family that will gain the file
* @param srcPath {@link Path} to the file to import
* @param seqNum Bulk Load sequence number
+ * @param copyFile always copy hfiles if true
* @return The destination {@link Path} of the bulk loaded file
* @throws IOException
*/
- Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum)
+ Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum, boolean copyFile)
throws IOException {
// Copy the file if it's on another filesystem
FileSystem srcFs = srcPath.getFileSystem(conf);
@@ -459,7 +460,7 @@ public class HRegionFileSystem {
// We can't compare FileSystem instances as equals() includes UGI instance
// as part of the comparison and won't work when doing SecureBulkLoad
// TODO deal with viewFS
- if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
+ if (copyFile || !FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) {
LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination store. Copying file over to destination filesystem.");
Path tmpPath = createTempName();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c4bd849..600e183 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -735,8 +735,13 @@ public class HStore implements Store {
@Override
public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException {
+ return bulkLoadHFile(srcPathStr, seqNum, false);
+ }
+
+ @Override
+ public Path bulkLoadHFile(String srcPathStr, long seqNum, boolean copyFile) throws IOException {
Path srcPath = new Path(srcPathStr);
- Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum);
+ Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum, copyFile);
LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as "
+ dstPath + " - updating store file list.");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 6f92f9d..bd14ea2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2077,7 +2077,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
if (!bypass) {
- loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+ loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
+ request.getCopyFile());
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index efd68b8..aa9093c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -553,6 +553,21 @@ public interface Region extends ConfigurationObserver {
boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException;
+ /**
+ * Attempts to atomically load a group of hfiles. This is critical for loading
+ * rows with multiple column families atomically.
+ *
+ * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+ * @param assignSeqId
+ * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
+ * file about to be bulk loaded
+ * @param copyFile always copy hfiles if true
+ * @return true if successful, false if failed recoverably
+ * @throws IOException if failed unrecoverably.
+ */
+ boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId,
+ BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException;
+
///////////////////////////////////////////////////////////////////////////
// Coprocessors
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 9f53ac5..1960a7e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -247,7 +247,7 @@ public class SecureBulkLoadManager {
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
- new SecureBulkLoadListener(fs, bulkToken, conf));
+ new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 853a4cf..3caa9e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -300,6 +300,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
*/
Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException;
+ /**
+ * This method should only be called from Region. It is assumed that the ranges of values in the
+ * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this)
+ *
+ * @param srcPathStr
+ * @param sequenceId sequence Id associated with the HFile
+ * @param copyFile always copy hfiles if true
+ */
+ Path bulkLoadHFile(String srcPathStr, long sequenceId, boolean copyFile) throws IOException;
+
// General accessors into the state of the store
// TODO abstract some of this out into a metrics class