diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index eddf8f1..5af8034 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -113,9 +113,30 @@ public class SecureBulkLoadClient { final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) throws IOException { + return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken, + false); + } + + /** + * Securely bulk load a list of HFiles using client protocol. + * + * @param client + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @param userToken + * @param bulkToken + * @param copyFiles + * @return true if all are loaded + * @throws IOException + */ + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token userToken, final String bulkToken, boolean copyFiles) throws IOException { BulkLoadHFileRequest request = RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, - userToken, bulkToken); + userToken, bulkToken, copyFiles); try { BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index b75d2b8..4151a0c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -528,6 +528,22 @@ public final class RequestConverter { final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) { + return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, + false); + } + + /** + * Create a protocol buffer bulk load request + * + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @return a bulk load request + */ + public static BulkLoadHFileRequest buildBulkLoadHFileRequest( + final List> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token userToken, final String bulkToken, boolean copyFiles) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -560,6 +576,7 @@ public final class RequestConverter { if (bulkToken != null) { request.setBulkToken(bulkToken); } + request.setCopyFile(copyFiles); return request.build(); } diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java index 4e9e0b4..f0141df 100644 --- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java +++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java @@ -21077,6 +21077,16 @@ public final class ClientProtos { */ com.google.protobuf.ByteString getBulkTokenBytes(); + + // optional bool copy_file = 6 [default = false]; + /** + * optional bool copy_file = 6 [default = false]; + */ + boolean hasCopyFile(); + /** + * optional bool copy_file = 6 [default = false]; + */ + boolean getCopyFile(); } /** * Protobuf type {@code hbase.pb.BulkLoadHFileRequest} @@ -21179,6 +21189,11 @@ public final class ClientProtos { bulkToken_ = input.readBytes(); break; } + case 48: { + bitField0_ |= 0x00000010; + copyFile_ = input.readBool(); + break; + } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { @@ -21979,12 +21994,29 @@ public final class ClientProtos { } } + // optional bool copy_file = 6 [default = false]; + public static final int COPY_FILE_FIELD_NUMBER = 6; + private boolean copyFile_; + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean hasCopyFile() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean getCopyFile() { + return copyFile_; + } + private void initFields() { region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance(); familyPath_ = java.util.Collections.emptyList(); assignSeqNum_ = false; fsToken_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken.getDefaultInstance(); bulkToken_ = ""; + copyFile_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -22027,6 +22059,9 @@ public final class ClientProtos { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(5, getBulkTokenBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeBool(6, copyFile_); + } getUnknownFields().writeTo(output); } @@ -22056,6 +22091,10 @@ public final class ClientProtos { size += com.google.protobuf.CodedOutputStream .computeBytesSize(5, getBulkTokenBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(6, copyFile_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -22101,6 +22140,11 @@ public final class ClientProtos { result = result && getBulkToken() .equals(other.getBulkToken()); } + result = result && (hasCopyFile() == other.hasCopyFile()); + if (hasCopyFile()) { + result = result && (getCopyFile() + == other.getCopyFile()); + } result = result && getUnknownFields().equals(other.getUnknownFields()); return result; @@ -22134,6 +22178,10 @@ public final class ClientProtos { hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER; hash = (53 * hash) + getBulkToken().hashCode(); } + if (hasCopyFile()) { + hash = (37 * hash) + COPY_FILE_FIELD_NUMBER; + hash = (53 * hash) + hashBoolean(getCopyFile()); + } hash = (29 * hash) + getUnknownFields().hashCode(); memoizedHashCode = hash; return hash; @@ -22274,6 +22322,8 @@ public final class ClientProtos { bitField0_ = (bitField0_ & ~0x00000008); bulkToken_ = ""; bitField0_ = (bitField0_ & ~0x00000010); + copyFile_ = false; + bitField0_ = (bitField0_ & ~0x00000020); return this; } @@ -22335,6 +22385,10 @@ public final class ClientProtos { to_bitField0_ |= 0x00000008; } result.bulkToken_ = bulkToken_; + if (((from_bitField0_ & 0x00000020) == 0x00000020)) { + to_bitField0_ |= 0x00000010; + } + result.copyFile_ = copyFile_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -22391,6 +22445,9 @@ public final class ClientProtos { bulkToken_ = other.bulkToken_; onChanged(); } + if (other.hasCopyFile()) { + setCopyFile(other.getCopyFile()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -23013,6 +23070,39 @@ public final class ClientProtos { return this; } + // optional bool copy_file = 6 [default = false]; + private boolean copyFile_ ; + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean hasCopyFile() { + return ((bitField0_ & 0x00000020) == 0x00000020); + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public boolean getCopyFile() { + return copyFile_; + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public Builder setCopyFile(boolean value) { + bitField0_ |= 0x00000020; + copyFile_ = value; + onChanged(); + return this; + } + /** + * optional bool copy_file = 6 [default = false]; + */ + public Builder clearCopyFile() { + bitField0_ = (bitField0_ & ~0x00000020); + copyFile_ = false; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest) } @@ -39320,81 +39410,81 @@ public final class ClientProtos { "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" + "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" + "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" + - "s\"\206\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + + "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" + "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" + "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" + "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" + "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" + - "en\022\022\n\nbulk_token\030\005 \001(\t\032*\n\nFamilyPath\022\016\n\006" + - "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF", - "ileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegatio" + - "nToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002" + - " \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026P" + - "repareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(" + - "\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031" + - ".hbase.pb.RegionSpecifier\"-\n\027PrepareBulk" + - "LoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Clea" + - "nupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)" + - "\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecifie" + - "r\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproces", - "sorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_n" + - "ame\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reques" + - "t\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005v" + - "alue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031C" + - "oprocessorServiceRequest\022)\n\006region\030\001 \002(\013" + - "2\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(" + - "\0132 .hbase.pb.CoprocessorServiceCall\"o\n\032C" + - "oprocessorServiceResponse\022)\n\006region\030\001 \002(" + - "\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 " + - "\002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022", - "\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase" + - ".pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.p" + - "b.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.C" + - "oprocessorServiceCall\"k\n\014RegionAction\022)\n" + - "\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" + - "\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" + - ".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" + - "eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" + - "\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Multi" + - "RegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.", - "pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase" + - ".pb.RegionLoadStats\"\336\001\n\021ResultOrExceptio" + - "n\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase" + - ".pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb" + - ".NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"" + - ".hbase.pb.CoprocessorServiceResult\0220\n\tlo" + - "adStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats" + - "B\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOrEx" + - "ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" + - "on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt", - "esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" + - " \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" + - "up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" + - "ondition\"\226\001\n\rMultiResponse\0228\n\022regionActi" + - "onResult\030\001 \003(\0132\034.hbase.pb.RegionActionRe" + - "sult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatist" + - "ics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStat" + - "s*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE" + - "\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.G" + - "etRequest\032\025.hbase.pb.GetResponse\022;\n\006Muta", - "te\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.M" + - "utateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReq" + - "uest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoad" + - "HFile\022\036.hbase.pb.BulkLoadHFileRequest\032\037." + - "hbase.pb.BulkLoadHFileResponse\022V\n\017Prepar" + - "eBulkLoad\022 .hbase.pb.PrepareBulkLoadRequ" + - "est\032!.hbase.pb.PrepareBulkLoadResponse\022V" + - "\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBulk" + - "LoadRequest\032!.hbase.pb.CleanupBulkLoadRe" + - "sponse\022X\n\013ExecService\022#.hbase.pb.Coproce", - "ssorServiceRequest\032$.hbase.pb.Coprocesso" + - "rServiceResponse\022d\n\027ExecRegionServerServ" + - "ice\022#.hbase.pb.CoprocessorServiceRequest" + - "\032$.hbase.pb.CoprocessorServiceResponse\0228" + - "\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." + - "pb.MultiResponseBB\n*org.apache.hadoop.hb" + - "ase.protobuf.generatedB\014ClientProtosH\001\210\001" + - "\001\240\001\001" + "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(" + + "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014", + "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" + + "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" + + "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" + + " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" + + "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" + + "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" + + "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" + + "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" + + "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013" + + "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu", + "lkLoadResponse\"a\n\026CoprocessorServiceCall" + + "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" + + "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" + + "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" + + "base.pb.NameBytesPair\"v\n\031CoprocessorServ" + + "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" + + "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" + + "oprocessorServiceCall\"o\n\032CoprocessorServ" + + "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" + + "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb", + ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" + + "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" + + "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" + + "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" + + "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" + + "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" + + "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" + + "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" + + "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction" + + "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat", + "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" + + "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" + + "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" + + "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" + + "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" + + "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" + + "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" + + "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" + + "ActionResult\0226\n\021resultOrException\030\001 \003(\0132" + + "\033.hbase.pb.ResultOrException\022*\n\texceptio", + "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" + + "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" + + "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" + + "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" + + "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" + + "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" + + "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" + + "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" + + "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS" + + "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb", + "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." + + "MutateRequest\032\030.hbase.pb.MutateResponse\022" + + "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" + + "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." + + "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" + + "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" + + "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" + + ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" + + "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!." + + "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec", + "Service\022#.hbase.pb.CoprocessorServiceReq" + + "uest\032$.hbase.pb.CoprocessorServiceRespon" + + "se\022d\n\027ExecRegionServerService\022#.hbase.pb" + + ".CoprocessorServiceRequest\032$.hbase.pb.Co" + + "processorServiceResponse\0228\n\005Multi\022\026.hbas" + + "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" + + "seBB\n*org.apache.hadoop.hbase.protobuf.g" + + "eneratedB\014ClientProtosH\001\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -39502,7 +39592,7 @@ public final class ClientProtos { internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_hbase_pb_BulkLoadHFileRequest_descriptor, - new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", }); + new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "CopyFile", }); internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor = internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0); internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index adb66f7..6c0c00c 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -339,6 +339,7 @@ message BulkLoadHFileRequest { optional bool assign_seq_num = 3; optional DelegationToken fs_token = 4; optional string bulk_token = 5; + optional bool copy_file = 6 [default = false]; message FamilyPath { required bytes family = 1; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index e3542ef8..c831efd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -115,6 +115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; public final static String SILENCE_CONF_KEY = "ignore.unmatched.families"; + public final static String ALWAYS_COPY_FILES = "always.copy.files"; // We use a '.' prefix which is ignored when walking directory trees // above. It is invalid family name. @@ -328,7 +329,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator) throws TableNotFoundException, IOException { - doBulkLoad(hfofDir, admin, table, regionLocator, false); + doBulkLoad(hfofDir, admin, table, regionLocator, false, false); } void cleanup(Admin admin, Deque queue, ExecutorService pool, @@ -360,10 +361,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param table the table to load into * @param regionLocator region locator * @param silence true to ignore unmatched column families + * @param copyFile always copy hfiles if true * @throws TableNotFoundException if table does not yet exist */ public void doBulkLoad(Map> map, final Admin admin, Table table, - RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { + RegionLocator regionLocator, boolean silence, boolean copyFile) + throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); } @@ -386,7 +389,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { break; } } - performBulkLoad(admin, table, regionLocator, queue, pool, secureClient); + performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); } finally { cleanup(admin, queue, pool, secureClient); } @@ -402,10 +405,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param table the table to load into * @param regionLocator region locator * @param silence true to ignore unmatched column families + * @param copyFile always copy hfiles if true * @throws TableNotFoundException if table does not yet exist */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, - RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { + RegionLocator regionLocator, boolean silence, boolean copyFile) + throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); } @@ -437,7 +442,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } pool = createExecutorService(); secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); - performBulkLoad(admin, table, regionLocator, queue, pool, secureClient); + performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); } finally { cleanup(admin, queue, pool, secureClient); } @@ -445,7 +450,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, Deque queue, ExecutorService pool, - SecureBulkLoadClient secureClient) throws IOException { + SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { int count = 0; if(isSecureBulkLoadEndpointAvailable()) { @@ -486,7 +491,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + " hfiles to one family of one region"); } - bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); + bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile); // NOTE: The next iteration's split / group could happen in parallel to // atomic bulkloads assuming that there are splits and no merges, and @@ -599,12 +604,29 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void loadHFileQueue(final Table table, final Connection conn, Deque queue, Pair startEndKeys) throws IOException { + loadHFileQueue(table, conn, queue, startEndKeys, false); + } + + /** + * Used by the replication sink to load the hfiles from the source cluster. It does the following, + *
    + *
  1. LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}
  2. + *
  3. LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap) + *
  4. + *
+ * @param table Table to which these hfiles should be loaded to + * @param conn Connection to use + * @param queue {@link LoadQueueItem} has hfiles yet to be loaded + * @param startEndKeys starting and ending row keys of the region + */ + public void loadHFileQueue(final Table table, final Connection conn, Deque queue, + Pair startEndKeys, boolean copyFile) throws IOException { ExecutorService pool = null; try { pool = createExecutorService(); Multimap regionGroups = groupOrSplitPhase(table, pool, queue, startEndKeys); - bulkLoadPhase(table, conn, pool, queue, regionGroups); + bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile); } finally { if (pool != null) { pool.shutdown(); @@ -619,7 +641,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ protected void bulkLoadPhase(final Table table, final Connection conn, ExecutorService pool, Deque queue, - final Multimap regionGroups) throws IOException { + final Multimap regionGroups, boolean copyFile) throws IOException { // atomically bulk load the groups. Set>> loadingFutures = new HashSet<>(); for (Entry> e: regionGroups.asMap().entrySet()){ @@ -630,7 +652,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public List call() throws Exception { List toRetry = - tryAtomicRegionLoad(conn, table.getName(), first, lqis); + tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile); return toRetry; } }; @@ -890,8 +912,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * failure */ protected List tryAtomicRegionLoad(final Connection conn, - final TableName tableName, final byte[] first, final Collection lqis) - throws IOException { + final TableName tableName, final byte[] first, final Collection lqis, + boolean copyFile) throws IOException { final List> famPaths = new ArrayList<>(lqis.size()); for (LoadQueueItem lqi : lqis) { if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) { @@ -911,7 +933,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = conn.getTable(getTableName())) { secureClient = new SecureBulkLoadClient(getConf(), table); success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, - assignSeqIds, fsDelegationToken.getUserToken(), bulkToken); + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile); } return success; } finally { @@ -1172,10 +1194,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); + boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); if (dirPath != null) { - doBulkLoad(hfofDir, admin, table, locator, silence); + doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); } else { - doBulkLoad(map, admin, table, locator, silence); + doBulkLoad(map, admin, table, locator, silence, copyFiles); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index debaec9..f6d2e36 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5422,6 +5422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false); + } + + @Override + public boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, + BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap(); @@ -5503,7 +5509,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { String finalPath = path; if (bulkLoadListener != null) { - finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); + finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile); } Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5ba8afd..541680c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -2078,7 +2078,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); } if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, + request.getCopyFile()); } if (region.getCoprocessorHost() != null) { loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index efd68b8..4e76e76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -512,7 +512,6 @@ public interface Region extends ConfigurationObserver { * pre/post processing of a given bulkload call */ interface BulkLoadListener { - /** * Called before an HFile is actually loaded * @param family family being loaded to @@ -520,7 +519,8 @@ public interface Region extends ConfigurationObserver { * @return final path to be used for actual loading * @throws IOException */ - String prepareBulkLoad(byte[] family, String srcPath) throws IOException; + String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile) + throws IOException; /** * Called after a successful HFile load @@ -553,6 +553,21 @@ public interface Region extends ConfigurationObserver { boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException; + /** + * Attempts to atomically load a group of hfiles. This is critical for loading + * rows with multiple column families atomically. + * + * @param familyPaths List of Pair<byte[] column family, String hfilePath> + * @param assignSeqId + * @param bulkLoadListener Internal hooks enabling massaging/preparation of a + * file about to be bulk loaded + * @param copyFile always copy hfiles if true + * @return true if successful, false if failed recoverably + * @throws IOException if failed unrecoverably. + */ + boolean bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, + BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException; + /////////////////////////////////////////////////////////////////////////// // Coprocessors diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index 0d64e4e..88c993e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -233,7 +233,7 @@ public class SecureBulkLoadManager { //We call bulkLoadHFiles as requesting user //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf)); + new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile()); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } @@ -305,7 +305,8 @@ public class SecureBulkLoadManager { } @Override - public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { + public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile) + throws IOException { Path p = new Path(srcPath); Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); @@ -329,6 +330,9 @@ public class SecureBulkLoadManager { LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + "the destination filesystem. Copying file over to destination staging dir."); FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } else if (copyFile) { + LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir."); + FileUtil.copy(srcFs, p, fs, stageP, false, conf); } else { LOG.debug("Moving " + p + " to " + stageP); FileStatus origFileStatus = fs.getFileStatus(p); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 0b96720..88b9247 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -131,6 +131,17 @@ public class TestLoadIncrementalHFiles { }); } + @Test(timeout = 120000) + public void testSimpleLoadWithFileCopy() throws Exception { + String testName = "mytable_testSimpleLoadWithFileCopy"; + final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); + runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE, + false, null, new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }, false, true); + } + /** * Test case that creates some regions and loads * HFiles that cross the boundaries of those regions @@ -291,12 +302,12 @@ public class TestLoadIncrementalHFiles { boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception { HTableDescriptor htd = buildHTD(tableName, bloomType); - runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap); + runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); } private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) - throws Exception { + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, + boolean copyFiles) throws Exception { Path dir = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); @@ -305,9 +316,11 @@ public class TestLoadIncrementalHFiles { int hfileIdx = 0; Map> map = null; List list = null; + if (useMap || copyFiles) { + list = new ArrayList<>(); + } if (useMap) { map = new TreeMap>(Bytes.BYTES_COMPARATOR); - list = new ArrayList<>(); map.put(FAMILY, list); } for (byte[][] range : hfileRanges) { @@ -326,7 +339,11 @@ public class TestLoadIncrementalHFiles { } final TableName tableName = htd.getTableName(); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); + Configuration conf = util.getConfiguration(); + if (copyFiles) { + conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); + } + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { loader.run(null, map, tableName); @@ -334,6 +351,12 @@ public class TestLoadIncrementalHFiles { loader.run(args); } + if (copyFiles) { + for (Path p : list) { + assertTrue(fs.exists(p)); + } + } + Table table = util.getConnection().getTable(tableName); try { assertEquals(expectedRows, util.countRows(table)); @@ -419,7 +442,7 @@ public class TestLoadIncrementalHFiles { htd.addFamily(family); try { - runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false); + runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false); assertTrue("Loading into table with non-existent family should have failed", false); } catch (Exception e) { assertTrue("IOException expected", e instanceof IOException); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 66d7eb1..90a1409 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -281,8 +281,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) { @Override protected List tryAtomicRegionLoad(final Connection conn, - TableName tableName, final byte[] first, Collection lqis) - throws IOException { + TableName tableName, final byte[] first, Collection lqis, + boolean copyFile) throws IOException { int i = attmptedCalls.incrementAndGet(); if (i == 1) { Connection errConn; @@ -293,10 +293,10 @@ public class TestLoadIncrementalHFilesSplitRecovery { throw new RuntimeException("mocking cruft, should never happen"); } failedCalls.incrementAndGet(); - return super.tryAtomicRegionLoad(errConn, tableName, first, lqis); + return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile); } - return super.tryAtomicRegionLoad(conn, tableName, first, lqis); + return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile); } }; try { @@ -359,13 +359,14 @@ public class TestLoadIncrementalHFilesSplitRecovery { @Override protected void bulkLoadPhase(final Table htable, final Connection conn, ExecutorService pool, Deque queue, - final Multimap regionGroups) throws IOException { + final Multimap regionGroups, boolean copyFile) + throws IOException { int i = attemptedCalls.incrementAndGet(); if (i == 1) { // On first attempt force a split. forceSplit(table); } - super.bulkLoadPhase(htable, conn, pool, queue, regionGroups); + super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile); } };