diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 4e9e0b4..f0141df 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -21077,6 +21077,16 @@ public final class ClientProtos { */ com.google.protobuf.ByteString getBulkTokenBytes(); + + // optional bool copy_file = 6 [default = false]; + /** + * optional bool copy_file = 6 [default = false]; + */ + boolean hasCopyFile(); + /** + * optional bool copy_file = 6 [default = false]; + */ + boolean getCopyFile(); } /** * Protobuf type {@code hbase.pb.BulkLoadHFileRequest} @@ -21179,6 +21189,11 @@ public final class ClientProtos { bulkToken_ = input.readBytes(); break; } + case 48: { + bitField0_ |= 0x00000010; + copyFile_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -21979,12 +21994,29 @@ public final class ClientProtos { } } + // optional bool copy_file = 6 [default = false]; + public static final int COPY_FILE_FIELD_NUMBER = 6; + private boolean copyFile_; + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean hasCopyFile() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean getCopyFile() { + return copyFile_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); familyPath_ = java.util.Collections.emptyList(); assignSeqNum_ = false; fsToken_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken.getDefaultInstance(); bulkToken_ = ""; + copyFile_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -22027,6 +22059,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(5, getBulkTokenBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBool(6, copyFile_); + } getUnknownFields().writeTo(output); } @@ -22056,6 +22091,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(5, getBulkTokenBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(6, copyFile_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -22101,6 +22140,11 @@ public final class ClientProtos { result = result && getBulkToken() .equals(other.getBulkToken()); } + result = result && (hasCopyFile() == other.hasCopyFile()); + if (hasCopyFile()) { + result = result && (getCopyFile() + == other.getCopyFile()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -22134,6 +22178,10 @@ public final class ClientProtos { hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER; hash = (53 * hash) + getBulkToken().hashCode(); } + if (hasCopyFile()) { + hash = (37 * hash) + COPY_FILE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getCopyFile()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -22274,6 +22322,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000008); bulkToken_ = ""; bitField0_ = (bitField0_ & ~0x00000010); + copyFile_ = false; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -22335,6 +22385,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000008; } result.bulkToken_ = bulkToken_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000010; + } + result.copyFile_ = copyFile_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -22391,6 +22445,9 @@ public final class ClientProtos { bulkToken_ = other.bulkToken_; onChanged(); } + if (other.hasCopyFile()) { + setCopyFile(other.getCopyFile()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -23013,6 +23070,39 @@ public final class ClientProtos { return this; } + // optional bool copy_file = 6 [default = false]; + private boolean copyFile_ ; + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean hasCopyFile() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean getCopyFile() { + return copyFile_; + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public Builder setCopyFile(boolean value) { + bitField0_ |= 0x00000020; + copyFile_ = value; + onChanged(); + return this; + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public Builder clearCopyFile() { + bitField0_ = (bitField0_ & ~0x00000020); + copyFile_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest) } @@ -39320,81 +39410,81 @@ public final class ClientProtos { "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + - "s\"\206\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + + "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" + "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" + - "en\022\022\n\nbulk_token\030\005 \001(\t\032*\n\nFamilyPath\022\016\n\006" + - "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF", - "ileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegatio" + - "nToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002" + - " \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026P" + - "repareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(" + - "\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031" + - ".hbase.pb.RegionSpecifier\"-\n\027PrepareBulk" + - "LoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Clea" + - "nupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)" + - "\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecifie" + - "r\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproces", - "sorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_n" + - "ame\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reques" + - "t\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005v" + - "alue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031C" + - "oprocessorServiceRequest\022)\n\006region\030\001 \002(\013" + - "2\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(" + - "\0132 .hbase.pb.CoprocessorServiceCall\"o\n\032C" + - "oprocessorServiceResponse\022)\n\006region\030\001 \002(" + - "\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 " + - "\002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022", - "\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase" + - ".pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.p" + - "b.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.C" + - "oprocessorServiceCall\"k\n\014RegionAction\022)\n" + - "\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" + - "\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" + - ".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" + - "eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" + - "\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Multi" + - "RegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.", - "pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase" + - ".pb.RegionLoadStats\"\336\001\n\021ResultOrExceptio" + - "n\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase" + - ".pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb" + - ".NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"" + - ".hbase.pb.CoprocessorServiceResult\0220\n\tlo" + - "adStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats" + - "B\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOrEx" + - "ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" + - "on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt", - "esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" + - " \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" + - "up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" + - "ondition\"\226\001\n\rMultiResponse\0228\n\022regionActi" + - "onResult\030\001 \003(\0132\034.hbase.pb.RegionActionRe" + - "sult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatist" + - "ics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStat" + - "s*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE" + - "\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.G" + - "etRequest\032\025.hbase.pb.GetResponse\022;\n\006Muta", - "te\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.M" + - "utateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReq" + - "uest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoad" + - "HFile\022\036.hbase.pb.BulkLoadHFileRequest\032\037." + - "hbase.pb.BulkLoadHFileResponse\022V\n\017Prepar" + - "eBulkLoad\022 .hbase.pb.PrepareBulkLoadRequ" + - "est\032!.hbase.pb.PrepareBulkLoadResponse\022V" + - "\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBulk" + - "LoadRequest\032!.hbase.pb.CleanupBulkLoadRe" + - "sponse\022X\n\013ExecService\022#.hbase.pb.Coproce", - "ssorServiceRequest\032$.hbase.pb.Coprocesso" + - "rServiceResponse\022d\n\027ExecRegionServerServ" + - "ice\022#.hbase.pb.CoprocessorServiceRequest" + - "\032$.hbase.pb.CoprocessorServiceResponse\0228" + - "\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." + - "pb.MultiResponseBB\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\014ClientProtosH\001\210\001" + - "\001\240\001\001" + "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(" + + "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014", + "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" + + "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" + + "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" + + " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" + + "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" + + "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" + + "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" + + "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" + + "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013" + + "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu", + "lkLoadResponse\"a\n\026CoprocessorServiceCall" + + "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" + + "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" + + "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" + + "base.pb.NameBytesPair\"v\n\031CoprocessorServ" + + "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" + + "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" + + "oprocessorServiceCall\"o\n\032CoprocessorServ" + + "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" + + "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb", + ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" + + "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" + + "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" + + "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" + + "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" + + "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" + + "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" + + "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" + + "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction" + + "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat", + "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" + + "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" + + "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + + "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" + + "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" + + "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" + + "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" + + "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" + + "ActionResult\0226\n\021resultOrException\030\001 \003(\0132" + + "\033.hbase.pb.ResultOrException\022*\n\texceptio", + "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + + "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + + "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + + "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" + + "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" + + "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" + + "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" + + "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" + + "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS" + + "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb", + "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." + + "MutateRequest\032\030.hbase.pb.MutateResponse\022" + + "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" + + "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." + + "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" + + "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" + + "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" + + ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" + + "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!." + + "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec", + "Service\022#.hbase.pb.CoprocessorServiceReq" + + "uest\032$.hbase.pb.CoprocessorServiceRespon" + + "se\022d\n\027ExecRegionServerService\022#.hbase.pb" + + ".CoprocessorServiceRequest\032$.hbase.pb.Co" + + "processorServiceResponse\0228\n\005Multi\022\026.hbas" + + "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" + + "seBB\n*org.apache.hadoop.hbase.protobuf.g" + + "eneratedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -39502,7 +39592,7 @@ public final class ClientProtos { internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_BulkLoadHFileRequest_descriptor, - new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", }); + new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "CopyFile", }); internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor = internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0); internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index adb66f7..6c0c00c 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -339,6 +339,7 @@ message BulkLoadHFileRequest { optional bool assign_seq_num = 3; optional DelegationToken fs_token = 4; optional string bulk_token = 5; + optional bool copy_file = 6 [default = false]; message FamilyPath { required bytes family = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index debaec9..cf7ffc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5422,6 +5422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); + } + + @Override + public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, + BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap(); @@ -5505,7 +5511,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); + Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId, copyFile); // Note the size of the store file try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java index 508b4a7..9a9eb4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionFileSystem.java @@ -447,10 +447,11 @@ public class HRegionFileSystem { * @param familyName Family that will gain the file * @param srcPath {@link Path} to the file to import * @param seqNum Bulk Load sequence number + * @param copyFile always copy hfiles if true * @return The destination {@link Path} of the bulk loaded file * @throws IOException */ - Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum) + Path bulkLoadStoreFile(final String familyName, Path srcPath, long seqNum, boolean copyFile) throws IOException { // Copy the file if it's on another filesystem FileSystem srcFs = srcPath.getFileSystem(conf); @@ -459,7 +460,7 @@ public class HRegionFileSystem { // We can't compare FileSystem instances as equals() includes UGI instance // as part of the comparison and won't work when doing SecureBulkLoad // TODO deal with viewFS - if (!FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) { + if (copyFile || !FSHDFSUtils.isSameHdfs(conf, srcFs, desFs)) { LOG.info("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination store. Copying file over to destination filesystem."); Path tmpPath = createTempName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c4bd849..600e183 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -735,8 +735,13 @@ public class HStore implements Store { @Override public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + return bulkLoadHFile(srcPathStr, seqNum, false); + } + + @Override + public Path bulkLoadHFile(String srcPathStr, long seqNum, boolean copyFile) throws IOException { Path srcPath = new Path(srcPathStr); - Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum, copyFile); LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + dstPath + " - updating store file list."); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6f92f9d..bd14ea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2077,7 +2077,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, + request.getCopyFile()); } if (region.getCoprocessorHost() != null) { loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index efd68b8..aa9093c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -553,6 +553,21 @@ public interface Region extends ConfigurationObserver { boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException; + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair<byte[] column family, String hfilePath> + * @param assignSeqId + * @param bulkLoadListener Internal hooks enabling massaging/preparation of a + * file about to be bulk loaded + * @param copyFile always copy hfiles if true + * @return true if successful, false if failed recoverably + * @throws IOException if failed unrecoverably. + */ + boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, + BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException; + /////////////////////////////////////////////////////////////////////////// // Coprocessors diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 9f53ac5..1960a7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -247,7 +247,7 @@ public class SecureBulkLoadManager { //We call bulkLoadHFiles as requesting user //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf)); + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 853a4cf..3caa9e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -300,6 +300,16 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf */ Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; + /** + * This method should only be called from Region. It is assumed that the ranges of values in the + * HFile fit within the stores assigned region. (assertBulkLoadHFileOk checks this) + * + * @param srcPathStr + * @param sequenceId sequence Id associated with the HFile + * @param copyFile always copy hfiles if true + */ + Path bulkLoadHFile(String srcPathStr, long sequenceId, boolean copyFile) throws IOException; + // General accessors into the state of the store // TODO abstract some of this out into a metrics class