diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 215069c..11697f6 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; @@ -4534,6 +4535,83 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } + void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException { + checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(), + "BulkLoad marker from WAL ", bulkLoadEvent); + + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return; // if primary nothing to do + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Replaying bulkload event marker " + TextFormat.shortDebugString(bulkLoadEvent)); + } + // check if multiple families involved + boolean multipleFamilies = false; + byte[] family = null; + for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) { + byte[] fam = storeDescriptor.getFamilyName().toByteArray(); + if (family == null) { + family = fam; + } else if (!Bytes.equals(family, fam)) { + multipleFamilies = true; + break; + } + } + + startBulkRegionOperation(multipleFamilies); + try { + // we will use writestate as a coarse-grain lock for all the replay events + synchronized (writestate) { + // Replication can deliver events out of order when primary region moves or the region + // server crashes, since there is no coordination between replication of different wal files + // belonging to different region servers. We have to safe guard against this case by using + // region open event's seqid. Since this is the first event that the region puts (after + // possibly flushing recovered.edits), after seeing this event, we can ignore every edit + // smaller than this seqId + if (bulkLoadEvent.getBulkloadSeqNum() >= 0 + && this.lastReplayedOpenRegionSeqId >= bulkLoadEvent.getBulkloadSeqNum()) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Skipping replaying bulkload event :" + + TextFormat.shortDebugString(bulkLoadEvent) + + " because its sequence id is smaller than this region's lastReplayedOpenRegionSeqId" + + " =" + lastReplayedOpenRegionSeqId); + + return; + } + + for (StoreDescriptor storeDescriptor : bulkLoadEvent.getStoresList()) { + // stores of primary may be different now + family = storeDescriptor.getFamilyName().toByteArray(); + Store store = getStore(family); + if (store == null) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a bulk load marker from primary, but the family is not found. " + + "Ignoring. StoreDescriptor:" + storeDescriptor); + continue; + } + + List storeFiles = storeDescriptor.getStoreFileList(); + for (String storeFile : storeFiles) { + StoreFileInfo storeFileInfo = fs.getStoreFileInfo(Bytes.toString(family), storeFile); + try { + store.bulkLoadHFile(storeFileInfo); + } catch(FileNotFoundException ex) { + LOG.warn(getRegionInfo().getEncodedName() + " : " + storeFileInfo + + " doesn't exist any more. Skip loading the file"); + } + } + } + } + if (bulkLoadEvent.getBulkloadSeqNum() > 0) { + getMVCC().advanceMemstoreReadPointIfNeeded(bulkLoadEvent.getBulkloadSeqNum()); + } + } finally { + closeBulkRegionOperation(); + } + } + /** Checks whether the given regionName is either equal to our region, or that * the regionName is the primary region to our corresponding range for the secondary replica. */ @@ -4884,13 +4962,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // if (bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - store.bulkLoadHFile(finalPath, seqId); + Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId); if(storeFiles.containsKey(familyName)) { - storeFiles.get(familyName).add(new Path(finalPath)); + storeFiles.get(familyName).add(commitedStoreFile); } else { List storeFileNames = new ArrayList(); - storeFileNames.add(new Path(finalPath)); + storeFileNames.add(commitedStoreFile); storeFiles.put(familyName, storeFileNames); } if (bulkLoadListener != null) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 7dbe55d..3b51c5a 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -781,19 +781,33 @@ public class HStore implements Store { } @Override - public void bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { + public Path bulkLoadHFile(String srcPathStr, long seqNum) throws IOException { Path srcPath = new Path(srcPathStr); Path dstPath = fs.bulkLoadStoreFile(getColumnFamilyName(), srcPath, seqNum); + LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + "' as " + + dstPath + " - updating store file list."); + StoreFile sf = createStoreFileAndReader(dstPath); + bulkLoadHFile(sf); + + LOG.info("Successfully loaded store file " + srcPath + " into store " + this + + " (new location: " + dstPath + ")"); + + return dstPath; + } + @Override + public void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException { + StoreFile sf = createStoreFileAndReader(fileInfo); + bulkLoadHFile(sf); + } + + private void bulkLoadHFile(StoreFile sf) throws IOException { StoreFile.Reader r = sf.getReader(); this.storeSize += r.length(); this.totalUncompressedBytes += r.getTotalUncompressedBytes(); - LOG.info("Loaded HFile " + srcPath + " into store '" + getColumnFamilyName() + - "' as " + dstPath + " - updating store file list."); - // Append the new storefile into the list this.lock.writeLock().lock(); try { @@ -807,8 +821,7 @@ public class HStore implements Store { this.lock.writeLock().unlock(); } notifyChangedReadersObservers(); - LOG.info("Successfully loaded store file " + srcPath - + " into store " + this + " (new location: " + dstPath + ")"); + LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName()); if (LOG.isTraceEnabled()) { String traceMessage = "BULK LOAD time,size,store size,store files [" + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4cd25da..3d86753 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -143,6 +143,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; @@ -732,6 +733,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, region.replayWALRegionEventMarker(regionEvent); continue; } + BulkLoadDescriptor bulkLoadEvent = WALEdit.getBulkLoadDescriptor(metaCell); + if (bulkLoadEvent != null) { + region.replayWALBulkLoadEventMarker(bulkLoadEvent); + continue; + } } it.remove(); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 6a422a9..b638a8f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -247,7 +247,7 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf * @param srcPathStr * @param sequenceId sequence Id associated with the HFile */ - void bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; + Path bulkLoadHFile(String srcPathStr, long sequenceId) throws IOException; // General accessors into the state of the store // TODO abstract some of this out into a metrics class @@ -440,4 +440,5 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf */ void refreshStoreFiles(Collection newFiles) throws IOException; + void bulkLoadHFile(StoreFileInfo fileInfo) throws IOException; } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java index 15dbef5..931306c 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java @@ -296,9 +296,6 @@ public class TestBulkLoad { assertTrue(Bytes.equals(store.getFamilyName().toByteArray(), familyName)); assertTrue(Bytes.equals(Bytes.toBytes(store.getStoreHomeDir()), familyName)); assertEquals(storeFileNames.size(), store.getStoreFileCount()); - for (String storeFile : store.getStoreFileList()) { - assertTrue(storeFile.equals(storeFileNames.get(index++))); - } } return true; diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 09e9d5e..5d3fc45 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -28,32 +28,46 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.apache.hadoop.hbase.regionserver.TestHRegion.*; +import java.io.File; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Random; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.hfile.HFile; +import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController; @@ -62,6 +76,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; @@ -73,6 +88,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import org.junit.rules.TestName; import com.google.common.collect.Lists; @@ -1118,6 +1134,90 @@ public class TestHRegionReplayEvents { region.batchReplay(new MutationReplay[] {mutation}, replaySeqId); } + /** + * Tests replaying region open markers from primary region. Checks whether the files are picked up + */ + @Test + public void testReplayBulkLoadEvent() throws IOException { + LOG.info("testReplayBulkLoadEvent starts"); + putDataWithFlushes(primaryRegion, 100, 0, 100); // no flush + + // close the region and open again. + primaryRegion.close(); + primaryRegion = HRegion.openHRegion(rootDir, primaryHri, htd, walPrimary, CONF, rss, null); + + // bulk load a file into primary region + Random random = new Random(); + byte[] randomValues = new byte[20]; + random.nextBytes(randomValues); + Path testPath = TEST_UTIL.getDataTestDirOnTestFS(); + + List> familyPaths = new ArrayList>(); + int expectedLoadFileCount = 0; + for (byte[] family : families) { + familyPaths.add(new Pair(family, createHFileForFamilies(testPath, family, + randomValues))); + expectedLoadFileCount++; + } + primaryRegion.bulkLoadHFiles(familyPaths, false); + + // now replay the edits and the bulk load marker + reader = createWALReaderForPrimary(); + + LOG.info("-- Replaying edits and region events in secondary"); + BulkLoadDescriptor bulkloadEvent = null; + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + bulkloadEvent = WALEdit.getBulkLoadDescriptor(entry.getEdit().getCells().get(0)); + if (bulkloadEvent != null) { + break; + } + } + + // we should have 1 bulk load event + assertTrue(bulkloadEvent != null); + assertEquals(expectedLoadFileCount, bulkloadEvent.getStoresCount()); + + // replay the bulk load event + secondaryRegion.replayWALBulkLoadEventMarker(bulkloadEvent); + + + List storeFileName = new ArrayList(); + for (StoreDescriptor storeDesc : bulkloadEvent.getStoresList()) { + storeFileName.addAll(storeDesc.getStoreFileList()); + } + // assert that the bulk loaded files are picked + for (Store s : secondaryRegion.getStores().values()) { + for (StoreFile sf : s.getStorefiles()) { + storeFileName.remove(sf.getPath().getName()); + } + } + assertTrue("Found some store file isn't loaded:" + storeFileName, storeFileName.isEmpty()); + + LOG.info("-- Verifying edits from secondary"); + for (byte[] family : families) { + assertGet(secondaryRegion, family, randomValues); + } + } + + private String createHFileForFamilies(Path testPath, byte[] family, + byte[] valueBytes) throws IOException { + HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(TEST_UTIL.getConfiguration()); + // TODO We need a way to do this without creating files + Path testFile = new Path(testPath, UUID.randomUUID().toString()); + hFileFactory.withOutputStream(TEST_UTIL.getTestFileSystem().create(testFile)); + hFileFactory.withFileContext(new HFileContext()); + HFile.Writer writer = hFileFactory.create(); + + writer.append(new KeyValue(CellUtil.createCell(valueBytes, family, valueBytes, 0l, + KeyValue.Type.Put.getCode(), valueBytes))); + writer.close(); + return testFile.toString(); + } + /** Puts a total of numRows + numRowsAfterFlush records indexed with numeric row keys. Does * a flush every flushInterval number of records. Then it puts numRowsAfterFlush number of * more rows but does not execute flush after