diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2ef7b20..b52c1e3 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -207,6 +207,26 @@ public class HRegion implements HeapSize { // , Writable{ protected long completeSequenceId = -1L; /** + * Per region Sequence ID. This is used to generate sequenceIds while appending to the WAL. + */ + private AtomicLong regionSequenceId = new AtomicLong(); + + /** + * @return current value if regionSequenceId. + */ + public long getRegionSequenceId() { + return this.regionSequenceId.longValue(); + } + + /** + * Increment and return the value of regionSequenceId. + * @return incremented value of regionSequenceId. + */ + public long incrementAndGetRegionSequenceId() { + return this.regionSequenceId.incrementAndGet(); + } + + /** * Operation enum is used in {@link HRegion#startRegionOperation} to provide operation context for * startRegionOperation to possibly invoke different checks before any region operations. Not all * operations have to be defined here. It's only needed when a special check is need in @@ -584,7 +604,10 @@ public class HRegion implements HeapSize { // , Writable{ if (nextSeqId == -1) { status .abort("Exception during region " + this.getRegionNameAsString() + " initialization."); + } else { + this.regionSequenceId.set(nextSeqId); } + LOG.debug("RegionSequenceId is initialized to: "+ regionSequenceId.get()); } } @@ -1503,13 +1526,12 @@ public class HRegion implements HeapSize { // , Writable{ mvcc.advanceMemstore(w); if (wal != null) { - Long startSeqId = wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); - if (startSeqId == null) { + if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) { status.setStatus("Flush will not be started for [" + this.getRegionInfo().getEncodedName() - + "] - WAL is going away"); - return false; + + "] - WAL is going away"); + return false; } - flushSeqId = startSeqId; + flushSeqId = this.regionSequenceId.incrementAndGet(); } else { flushSeqId = myseqid; } @@ -2203,7 +2225,8 @@ public class HRegion implements HeapSize { // , Writable{ Mutation mutation = batchOp.operations[firstIndex]; if (walEdit.size() > 0) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), - walEdit, mutation.getClusterId(), now, this.htableDescriptor); + walEdit, mutation.getClusterId(), now, this.htableDescriptor, + this.regionSequenceId.incrementAndGet()); } // ------------------------------- @@ -3323,7 +3346,8 @@ public class HRegion implements HeapSize { // , Writable{ if(bulkLoadListener != null) { finalPath = bulkLoadListener.prepareBulkLoad(familyName, path); } - store.bulkLoadHFile(finalPath, assignSeqId ? this.log.obtainSeqNum() : -1); + store.bulkLoadHFile(finalPath, assignSeqId ? + this.regionSequenceId.incrementAndGet() : -1); if(bulkLoadListener != null) { bulkLoadListener.doneBulkLoad(familyName, path); } @@ -4105,10 +4129,6 @@ public class HRegion implements HeapSize { // , Writable{ checkCompressionCodecs(); this.openSeqNum = initialize(reporter); - if (this.log != null) { - this.log.setSequenceNumber(this.openSeqNum); - } - return this; } @@ -4520,7 +4540,8 @@ public class HRegion implements HeapSize { // , Writable{ if (!walEdit.isEmpty()) { txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdit, - processor.getClusterId(), now, this.htableDescriptor); + processor.getClusterId(), now, this.htableDescriptor, + this.regionSequenceId.incrementAndGet()); } // 8. Release region lock if (locked) { @@ -4747,7 +4768,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + this.htableDescriptor, this.regionSequenceId.incrementAndGet()); } else { recordMutationWithoutWal(append.getFamilyMap()); } @@ -4897,7 +4918,7 @@ public class HRegion implements HeapSize { // , Writable{ // as a Put. txid = this.log.appendNoSync(this.getRegionInfo(), this.htableDescriptor.getName(), walEdits, HConstants.DEFAULT_CLUSTER_ID, EnvironmentEdgeManager.currentTimeMillis(), - this.htableDescriptor); + this.htableDescriptor, this.regionSequenceId.incrementAndGet()); } else { recordMutationWithoutWal(increment.getFamilyMap()); } @@ -4962,7 +4983,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + ClassSize.ARRAY + - 38 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + + 39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + (11 * Bytes.SIZEOF_LONG) + 4 * Bytes.SIZEOF_BOOLEAN); @@ -4979,7 +5000,8 @@ public class HRegion implements HeapSize { // , Writable{ public static final long DEEP_OVERHEAD = FIXED_OVERHEAD + ClassSize.OBJECT + // closeLock (2 * ClassSize.ATOMIC_BOOLEAN) + // closed, closing - (3 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL + (4 * ClassSize.ATOMIC_LONG) + // memStoreSize, numPutsWithoutWAL, dataInMemoryWithoutWAL + // regionSequenceId (2 * ClassSize.CONCURRENT_HASHMAP) + // lockedRows, scannerReadPoints WriteState.HEAP_SIZE + // writestate ClassSize.CONCURRENT_SKIPLISTMAP + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + // stores diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c894fe5..b765aef 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -188,6 +188,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.snapshot.RegionServerSnapshotManager; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.regionserver.wal.HLogServices; import org.apache.hadoop.hbase.regionserver.wal.HLogUtil; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.security.User; @@ -234,7 +235,7 @@ import com.google.protobuf.TextFormat; @SuppressWarnings("deprecation") public class HRegionServer implements ClientProtos.ClientService.BlockingInterface, AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices, - HBaseRPCErrorHandler, LastSequenceId { + HBaseRPCErrorHandler, LastSequenceId, HLogServices { public static final Log LOG = LogFactory.getLog(HRegionServer.class); @@ -1435,7 +1436,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa Path logdir = new Path(rootDir, logName); if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir); this.hlogForMeta = HLogFactory.createMetaHLog(this.fs.getBackingFs(), rootDir, logName, - this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString()); + this.conf, getMetaWALActionListeners(), this.serverNameFromMasterPOV.toString(), this); return this.hlogForMeta; } @@ -1448,7 +1449,7 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa */ protected HLog instantiateHLog(Path rootdir, String logName) throws IOException { return HLogFactory.createHLog(this.fs.getBackingFs(), rootdir, logName, this.conf, - getWALActionListeners(), this.serverNameFromMasterPOV.toString()); + getWALActionListeners(), this.serverNameFromMasterPOV.toString(), this); } /** @@ -4286,4 +4287,14 @@ public class HRegionServer implements ClientProtos.ClientService.BlockingInterfa } return result; } + + @Override + public Map obtainRegionsSequenceId() { + Map regionsSequenceId = new HashMap(); + Collection onlineRegions = this.getOnlineRegionsLocalContext(); + for (HRegion r : onlineRegions) { + regionsSequenceId.put(r.getRegionInfo().getEncodedNameAsBytes(), r.getRegionSequenceId()); + } + return regionsSequenceId; + } } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 4624b9f..ad3fb22 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1023,7 +1023,8 @@ public class HStore implements Store { CompactionDescriptor compactionDescriptor = ProtobufUtil.toCompactionDescriptor(info, family.getName(), inputPaths, outputPaths, fs.getStoreDir(getFamily().getNameAsString())); HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor); + this.region.getRegionInfo(),compactionDescriptor, + this.region.incrementAndGetRegionSequenceId()); } private void replaceStoreFiles(final Collection compactedFiles, diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 0ea32db..2fc1483 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -26,13 +26,12 @@ import java.lang.reflect.Method; import java.net.URLEncoder; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.SortedMap; import java.util.TreeMap; -import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -140,13 +139,6 @@ class FSHLog implements HLog, Syncable { Writer writer; /** - * Map of all log files but the current one. - */ - final SortedMap outputfiles = - Collections.synchronizedSortedMap(new TreeMap()); - - - /** * This lock synchronizes all operations on oldestUnflushedSeqNums and oldestFlushingSeqNums, * with the exception of append's putIfAbsent into oldestUnflushedSeqNums. * We only use these to find out the low bound seqNum, or to find regions with old seqNums to @@ -174,8 +166,6 @@ class FSHLog implements HLog, Syncable { private volatile boolean closed = false; - private final AtomicLong logSeqNum = new AtomicLong(0); - private boolean forMeta = false; // The timestamp (in ms) when the log file was created. @@ -227,6 +217,68 @@ class FSHLog implements HLog, Syncable { private final MetricsWAL metrics; /** + * HLogService instance to provide services such as regions sequenceIds at time of log rolling. + */ + private HLogServices logService; + + /** + * This map is used to hold the logs of this regionserver before they are archived. While + * archiving, we first look at the oldest log. The comparator used in this map sorts the log + * files in the order of their create timestamp, which is embedded in the log file name. + */ + private final ConcurrentSkipListMap> hLogFileToRegionSequenceIds = + new ConcurrentSkipListMap>(LOG_FILES_COMPARATOR); + + /** + * A comparator for log file names; The comparison is based on the timestamp attribute embedded + * in the log file name itself. It also takes care of .META. logs. A correct log file name format + * is regionserver_hostname-regionserver_port.start-timestamp, or, + * regionserver_hostname-regionserver_port.start-timestamp{@link META_HLOG_FILE_EXTN} + *

+ * In case of invalid log file name, it prints a warning but doesn't abort the regionserver. In + * any case, a path should never be null. + */ + static final Comparator LOG_FILES_COMPARATOR = new Comparator() { + + @Override + public int compare(Path path1, Path path2) { + String str1 = path1.toString(); + String str2 = path2.toString(); + // the last entity in the log file name is the timestamp. + long t1 = 0, t2 = 0; + t1 = extractTimeStampFromHLogName(str1); + t2 = extractTimeStampFromHLogName(str2); + if (t1 == 0 || t2 == 0) { + // one of them doesn't have a valid timestamp. + // fall back on the original path compare method. + return path1.compareTo(path2); + } else if (t1 == t2) return 0; + return (t1 > t2 ? 1 : -1); + } + }; + + /** + * Extracts the timestamp from the log file name. In case the name is not in the correct + * format, it returns 0. + * @param fileName + * @return + */ + private static long extractTimeStampFromHLogName(String fileName) { + long ts = 0; + try { + if (fileName.endsWith(META_HLOG_FILE_EXTN)) { + String str1 = fileName.substring(0, fileName.indexOf(META_HLOG_FILE_EXTN)); + ts = Long.parseLong(str1.substring(str1.lastIndexOf(".") + 1)); + } else { + ts = Long.parseLong(fileName.substring(fileName.lastIndexOf(".") + 1)); + } + } catch (Exception e) { + LOG.warn("Got an exception while comparing Log file name. ", e); + } + return ts; + } + + /** * Constructor. * * @param fs filesystem handle @@ -239,7 +291,7 @@ class FSHLog implements HLog, Syncable { final Configuration conf) throws IOException { this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, - conf, null, true, null, false); + conf, null, true, null, false, null); } /** @@ -256,7 +308,7 @@ class FSHLog implements HLog, Syncable { final String oldLogDir, final Configuration conf) throws IOException { this(fs, root, logDir, oldLogDir, - conf, null, true, null, false); + conf, null, true, null, false, null); } /** @@ -276,13 +328,14 @@ class FSHLog implements HLog, Syncable { * @param prefix should always be hostname and port in distributed env and * it will be URL encoded before being used. * If prefix is null, "hlog" will be used + * @param hLogService service to get regions information while rolling. * @throws IOException */ public FSHLog(final FileSystem fs, final Path root, final String logDir, final Configuration conf, final List listeners, - final String prefix) throws IOException { + final String prefix, HLogServices hlogServices) throws IOException { this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, - conf, listeners, true, prefix, false); + conf, listeners, true, prefix, false, hlogServices); } /** @@ -305,12 +358,13 @@ class FSHLog implements HLog, Syncable { * it will be URL encoded before being used. * If prefix is null, "hlog" will be used * @param forMeta if this hlog is meant for meta updates + * @param hLogService service to get regions information while rolling. * @throws IOException */ - public FSHLog(final FileSystem fs, final Path root, final String logDir, - final String oldLogDir, final Configuration conf, - final List listeners, - final boolean failIfLogDirExists, final String prefix, boolean forMeta) + public FSHLog(final FileSystem fs, final Path root, final String logDir, final String oldLogDir, + final Configuration conf, final List listeners, + final boolean failIfLogDirExists, final String prefix, boolean forMeta, + HLogServices hlogServices) throws IOException { super(); this.fs = fs; @@ -319,6 +373,7 @@ class FSHLog implements HLog, Syncable { this.oldLogDir = new Path(this.rootDir, oldLogDir); this.forMeta = forMeta; this.conf = conf; + this.logService = hlogServices; if (listeners != null) { for (WALActionsListener i: listeners) { @@ -433,21 +488,6 @@ class FSHLog implements HLog, Syncable { return this.filenum; } - @Override - public void setSequenceNumber(final long newvalue) { - for (long id = this.logSeqNum.get(); id < newvalue && - !this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) { - // This could spin on occasion but better the occasional spin than locking - // every increment of sequence number. - LOG.debug("Changed sequenceid from " + id + " to " + newvalue); - } - } - - @Override - public long getSequenceNumber() { - return logSeqNum.get(); - } - /** * Method used internal to this class and for tests only. * @return The wrapped stream our writer is using; its not the @@ -510,6 +550,7 @@ class FSHLog implements HLog, Syncable { Path oldFile = null; int oldNumEntries = 0; + Map regionsToSequenceId = new HashMap(); synchronized (updateLock) { // Clean up current writer. oldNumEntries = this.numEntries.get(); @@ -517,11 +558,25 @@ class FSHLog implements HLog, Syncable { this.writer = nextWriter; this.hdfs_out = nextHdfsOut; this.numEntries.set(0); + if (logService != null) { + regionsToSequenceId = logService.obtainRegionsSequenceId(); + } + } if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath)); - else LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries + - ", filesize=" + StringUtils.humanReadableInt(this.fs.getFileStatus(oldFile).getLen()) + - "; new WAL " + FSUtils.getPath(newPath)); + else { + LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries + + ", filesize=" + + StringUtils.humanReadableInt(this.fs.getFileStatus(oldFile).getLen()) + + "; new WAL " + FSUtils.getPath(newPath)); + if (LOG.isTraceEnabled()) { + LOG.trace("RegionSequenceId map for this log file: " + regionsToSequenceId); + for (Map.Entry e : regionsToSequenceId.entrySet()) { + LOG.trace("Entry: " + Bytes.toString(e.getKey()) + ", " + e.getValue()); + } + } + this.hLogFileToRegionSequenceIds.put(oldFile, regionsToSequenceId); + } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { @@ -561,37 +616,73 @@ class FSHLog implements HLog, Syncable { return HLogFactory.createWriter(fs, path, conf); } - /* - * Clean up old commit logs. - * @return If lots of logs, flush the returned region so next time through - * we can clean logs. Returns null if nothing to flush. Returns array of - * encoded region names to flush. + /** + * Clean up old commit logs. It is called while rolling the current log. + *

+ * While rolling a WAL, we create a map data structure for all the online regions on the hosting + * region server to their current sequenceID, i.e., logFile_path <--> {[RegionId : + * Current_SequenceId_at_time_of_rolling]}. We could call this map of region:sequenceIds a + * snapshot of the sequenceIds, i.e., a snapshot at the time when this wal was rolled. While + * cleaning a WAL, we compare these sequenceIds with ones present in the current + * oldestUnFlushedSeqNums and oldestFlushingSeqNums collections. If, for a wal, all its + * sequenceIds are less, then it is eligible for archiving. Otherwise, it still has entries which + * are not flushed yet. + *

* @throws IOException */ private void cleanOldLogs() throws IOException { - long oldestOutstandingSeqNum = Long.MAX_VALUE; - synchronized (oldestSeqNumsLock) { - Long oldestFlushing = (oldestFlushingSeqNums.size() > 0) - ? Collections.min(oldestFlushingSeqNums.values()) : Long.MAX_VALUE; - Long oldestUnflushed = (oldestUnflushedSeqNums.size() > 0) - ? Collections.min(oldestUnflushedSeqNums.values()) : Long.MAX_VALUE; - oldestOutstandingSeqNum = Math.min(oldestFlushing, oldestUnflushed); - } + // the below two variables are local version of oldestFlushingSeqNums and + // oldestUnflushedSeqNums. The crux of these maps is we need to do book-keeping for the + // sequenceIDs per regions which have dependency on WAL. That is, we are interested in the + // oldest sequence numbers which are NOT flushed. That info is keep in the + // oldestUnflushedSeqNums map. Since flushes are happening in parallel while we are rolling the + // WAL, we also need a map of sequenceIds that are in the processing of flushing, but haven't + // completed yet. For this, we have a separate map: oldestFlushingSeqNums. We make local + // version of these maps in order to compute whether a WAL is eligible for archiving or not. + Map oldestFlushingSeqNumsTemp = null; + Map oldestUnflushedSeqNumsTemp = null; - // Get the set of all log files whose last sequence number is smaller than - // the oldest edit's sequence number. - TreeSet sequenceNumbers = new TreeSet(this.outputfiles.headMap( - oldestOutstandingSeqNum).keySet()); - // Now remove old log files (if any) - if (LOG.isDebugEnabled()) { - if (sequenceNumbers.size() > 0) { - LOG.debug("Found " + sequenceNumbers.size() + " hlogs to remove" + - " out of total " + this.outputfiles.size() + ";" + - " oldest outstanding sequenceid is " + oldestOutstandingSeqNum); + synchronized (oldestSeqNumsLock) { + oldestFlushingSeqNumsTemp = new HashMap(oldestFlushingSeqNums); + oldestUnflushedSeqNumsTemp = new HashMap(oldestUnflushedSeqNums); } + List logsToArchive = new ArrayList(); + for (Map.Entry> e : hLogFileToRegionSequenceIds.entrySet()) { + // iterate over the log file + Path logPath = e.getKey(); + Map regionSequenceIds = e.getValue(); + boolean archiveWALFile = true; + for (byte[] regionName : regionSequenceIds.keySet()) { + // check its value in the flushing/unflushed collections + // if a region entry is not there, that means the region is already moved, or close, etc. + // we would make its entry as Long.MAX_VALUE for comparison here. + long oldestFlushingSeqNumForRegion = + (oldestFlushingSeqNumsTemp.containsKey(regionName) ? oldestFlushingSeqNumsTemp + .get(regionName) : Long.MAX_VALUE); + long oldestUnFlushedSeqNumForRegion = + (oldestUnflushedSeqNumsTemp.containsKey(regionName) ? oldestUnflushedSeqNumsTemp + .get(regionName) : Long.MAX_VALUE); + // compare the min of above two numbers to this logfile region sequenceIds snaphot + // If snapshot has the lesser value, it is good to archive it from this region's + // perspective. + if (regionSequenceIds.get(regionName) >= Math.min(oldestFlushingSeqNumForRegion, + oldestUnFlushedSeqNumForRegion)) { + archiveWALFile = false; + if (LOG.isTraceEnabled()) { + LOG.trace("The file has a region which has some unflushed entries, " + logPath + + ", region name: " + Bytes.toString(regionName)); + LOG.trace("Math.min(oldestFlushingSeqNumForRegion, oldestUnFlushedSeqNumForRegion)" + + Math.min(oldestFlushingSeqNumForRegion, oldestUnFlushedSeqNumForRegion)); + } + break; + } + } + if (archiveWALFile) logsToArchive.add(logPath); } - for (Long seq : sequenceNumbers) { - archiveLogFile(this.outputfiles.remove(seq), seq); + LOG.debug("WALs to archive; " + logsToArchive.toString()); + for (Path p : logsToArchive) { + archiveLogFile(p); + hLogFileToRegionSequenceIds.remove(p); } } @@ -615,31 +706,58 @@ class FSHLog implements HLog, Syncable { .toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); } + /** + * In case number of log files are greater than the configured, we look at the first WAL file + * to figure out regions which we should flush so that the oldest WAL could be archived. + * @return encoded name of regions which should be flushed to free up the oldest wal file. + */ private byte[][] getRegionsToForceFlush() throws IOException { - // If too many log files, figure which regions we need to flush. - // Array is an array of encoded region names. - byte [][] regions = null; - int logCount = getNumLogFiles(); - if (logCount > this.maxLogs && logCount > 0) { - // This is an array of encoded region names. - synchronized (oldestSeqNumsLock) { - regions = findMemstoresWithEditsEqualOrOlderThan(this.outputfiles.firstKey(), - this.oldestUnflushedSeqNums); - } - if (regions != null) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < regions.length; i++) { - if (i > 0) sb.append(", "); - sb.append(Bytes.toStringBinary(regions[i])); - } - LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + - this.maxLogs + "; forcing flush of " + regions.length + " regions(s): " + - sb.toString()); + // If too many log files, check the first log file and figure out the regions which should + // be flushed to get rid of this file. + byte[][] regions = null; + int logCount = hLogFileToRegionSequenceIds.size(); + LOG.debug("Count of HLogs: "+logCount +", maxLogs: "+this.maxLogs); + if (logCount > 0 && logCount > this.maxLogs) { + regions = findRegionsInWALToFlush(hLogFileToRegionSequenceIds.firstKey(), + this.oldestUnflushedSeqNums); + } + if (regions != null) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < regions.length; i++) { + if (i > 0) sb.append(", "); + sb.append(Bytes.toStringBinary(regions[i])); } + LOG.info("Too many hlogs: logs=" + logCount + ", maxlogs=" + this.maxLogs + + "; forcing flush of " + regions.length + " regions(s): " + sb.toString()); } return regions; } + /** + * Check the regions which are stopping us from archiving this file. + * Flushing these regions will help in archiving this file. + * @param logFile + * @param oldestUnflushedSeqNums2 + * @return encoded name of regions which should be flushed to free up the wal file. + */ + private byte[][] findRegionsInWALToFlush(Path logFile, + ConcurrentSkipListMap oldestUnflushedSeqNums2) { + LOG.debug("Checking the first file: " + logFile.toString()); + List regions = null; + Map regionSequenceIdsSnapshot = this.hLogFileToRegionSequenceIds.get(logFile); + // iterate over this map and see which regions are blocking the archiving. + for (Map.Entry e : regionSequenceIdsSnapshot.entrySet()) { + Long oldestUnflushedSeqIdForRegion = oldestUnflushedSeqNums2.get(e.getKey()); + if (oldestUnflushedSeqIdForRegion != null && + oldestUnflushedSeqIdForRegion.longValue() <= e.getValue()) { + if (regions == null) regions = new ArrayList(); + regions.add(e.getKey()); + } + } + if (regions != null) LOG.debug("Regions to flush: " + regions.toString()); + return regions == null ? null : regions.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY }); + } + /* * Cleans up current writer closing and adding to outputfiles. * Presumes we're operating inside an updateLock scope. @@ -683,17 +801,14 @@ class FSHLog implements HLog, Syncable { } if (currentfilenum >= 0) { oldFile = computeFilename(currentfilenum); - this.outputfiles.put(Long.valueOf(this.logSeqNum.get()), oldFile); } } return oldFile; } - private void archiveLogFile(final Path p, final Long seqno) throws IOException { + private void archiveLogFile(final Path p) throws IOException { Path newPath = getHLogArchivePath(this.oldLogDir, p); - LOG.info("moving old hlog file " + FSUtils.getPath(p) + - " whose highest sequenceid is " + seqno + " to " + - FSUtils.getPath(newPath)); + LOG.info("moving old hlog file " + FSUtils.getPath(p)+ " to " +FSUtils.getPath(newPath)); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { @@ -828,15 +943,16 @@ class FSHLog implements HLog, Syncable { @Override public void append(HRegionInfo info, byte [] tableName, WALEdit edits, - final long now, HTableDescriptor htd) + final long now, HTableDescriptor htd, long regionSequenceId) throws IOException { - append(info, tableName, edits, now, htd, true); + append(info, tableName, edits, now, htd, true, regionSequenceId); } @Override - public void append(HRegionInfo info, byte [] tableName, WALEdit edits, - final long now, HTableDescriptor htd, boolean isInMemstore) throws IOException { - append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore); + public void append(HRegionInfo info, byte[] tableName, WALEdit edits, final long now, + HTableDescriptor htd, boolean isInMemstore, long regionSequenceId) throws IOException { + append(info, tableName, edits, HConstants.DEFAULT_CLUSTER_ID, now, htd, true, isInMemstore, + regionSequenceId); } /** @@ -862,12 +978,14 @@ class FSHLog implements HLog, Syncable { * @param clusterId The originating clusterId for this edit (for replication) * @param now * @param doSync shall we sync? + * @param regionSequenceId * @return txid of this transaction * @throws IOException */ @SuppressWarnings("deprecation") - private long append(HRegionInfo info, byte [] tableName, WALEdit edits, UUID clusterId, - final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore) + private long append(HRegionInfo info, byte[] tableName, WALEdit edits, UUID clusterId, + final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, + long regionSequenceId) throws IOException { if (edits.isEmpty()) return this.unflushedEntries.get(); if (this.closed) { @@ -875,7 +993,6 @@ class FSHLog implements HLog, Syncable { } long txid = 0; synchronized (this.updateLock) { - long seqNum = obtainSeqNum(); // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region (i.e. the first edit added to the particular // memstore). . When the cache is flushed, the entry for the @@ -884,8 +1001,10 @@ class FSHLog implements HLog, Syncable { // Use encoded name. Its shorter, guaranteed unique and a subset of // actual name. byte [] encodedRegionName = info.getEncodedNameAsBytes(); - if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); - HLogKey logKey = makeKey(encodedRegionName, tableName, seqNum, now, clusterId); + if (isInMemstore) { + this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, regionSequenceId); + } + HLogKey logKey = makeKey(encodedRegionName, tableName, regionSequenceId, now, clusterId); doWrite(info, logKey, edits, htd); this.numEntries.incrementAndGet(); txid = this.unflushedEntries.incrementAndGet(); @@ -906,9 +1025,9 @@ class FSHLog implements HLog, Syncable { @Override public long appendNoSync(HRegionInfo info, byte [] tableName, WALEdit edits, - UUID clusterId, final long now, HTableDescriptor htd) + UUID clusterId, final long now, HTableDescriptor htd, long regionSequenceId) throws IOException { - return append(info, tableName, edits, clusterId, now, htd, false, true); + return append(info, tableName, edits, clusterId, now, htd, false, true, regionSequenceId); } /** @@ -1234,21 +1353,16 @@ class FSHLog implements HLog, Syncable { return numEntries.get(); } - @Override - public long obtainSeqNum() { - return this.logSeqNum.incrementAndGet(); - } - - /** @return the number of log files in use */ + /** @return the number of log files in use (exclusing the current one) */ int getNumLogFiles() { - return outputfiles.size(); + return hLogFileToRegionSequenceIds.size(); } @Override - public Long startCacheFlush(final byte[] encodedRegionName) { + public boolean startCacheFlush(final byte[] encodedRegionName) { Long oldRegionSeqNum = null; if (!closeBarrier.beginOp()) { - return null; + return false; } synchronized (oldestSeqNumsLock) { oldRegionSeqNum = this.oldestUnflushedSeqNums.remove(encodedRegionName); @@ -1267,7 +1381,7 @@ class FSHLog implements HLog, Syncable { LOG.warn("Couldn't find oldest seqNum for the region we are about to flush: [" + Bytes.toString(encodedRegionName) + "]"); } - return obtainSeqNum(); + return true; } @Override diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 548364b..fa4c5fc 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -199,22 +199,6 @@ public interface HLog { long getFilenum(); /** - * Called by HRegionServer when it opens a new region to ensure that log - * sequence numbers are always greater than the latest sequence number of the - * region being brought on-line. - * - * @param newvalue - * We'll set log edit/sequence number to this value if it is greater - * than the current value. - */ - void setSequenceNumber(final long newvalue); - - /** - * @return log sequence number - */ - long getSequenceNumber(); - - /** * Roll the log writer. That is, start writing log messages to a new file. * *

@@ -263,12 +247,12 @@ public interface HLog { void closeAndDelete() throws IOException; /** - * Same as {@link #appendNoSync(HRegionInfo, byte[], WALEdit, UUID, long, HTableDescriptor)}, - * except it causes a sync on the log + * Same as {@link #appendNoSync(HRegionInfo, byte[], WALEdit, UUID, long, HTableDescriptor, + * long)}, except it causes a sync on the log + * @param regionSequenceId region sequenceId to use while creating WALEdit. */ - void append( - HRegionInfo info, byte[] tableName, WALEdit edits, final long now, HTableDescriptor htd - ) throws IOException; + void append(HRegionInfo info, byte[] tableName, WALEdit edits, final long now, + HTableDescriptor htd, long regionSequenceId) throws IOException; /** * Append a set of edits to the log. Log edits are keyed by (encoded) @@ -280,6 +264,7 @@ public interface HLog { * @param now * @param htd * @param isInMemstore Whether the record is in memstore. False for system records. + * @param regionSequenceId region sequenceId to use while creating WALEdit. */ void append( HRegionInfo info, @@ -287,7 +272,7 @@ public interface HLog { WALEdit edits, final long now, HTableDescriptor htd, - boolean isInMemstore + boolean isInMemstore, long regionSequenceId ) throws IOException; /** @@ -302,6 +287,7 @@ public interface HLog { * The originating clusterId for this edit (for replication) * @param now * @param htd + * @param regionSequenceId region sequenceId to use while creating WALEdit. * @return txid of this transaction * @throws IOException */ @@ -311,7 +297,7 @@ public interface HLog { WALEdit edits, UUID clusterId, final long now, - HTableDescriptor htd + HTableDescriptor htd, long regionSequenceId ) throws IOException; void hsync() throws IOException; @@ -323,26 +309,19 @@ public interface HLog { void sync(long txid) throws IOException; /** - * Obtain a log sequence number. - */ - long obtainSeqNum(); - - /** * WAL keeps track of the sequence numbers that were not yet flushed from memstores * in order to be able to do cleanup. This method tells WAL that some region is about * to flush memstore. * * We stash the oldest seqNum for the region, and let the the next edit inserted in this - * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor)} - * as new oldest seqnum. In case of flush being aborted, we put the stashed value back; + * region be recorded in {@link #append(HRegionInfo, byte[], WALEdit, long, HTableDescriptor, + * long)} as new oldest seqnum. In case of flush being aborted, we put the stashed value back; * in case of flush succeeding, the seqNum of that first edit after start becomes the * valid oldest seqNum for this region. - * - * @return current seqNum, to pass on to flushers (who will put it into the metadata of - * the resulting file as an upper-bound seqNum for that file), or NULL if flush - * should not be started. + * @return true if it is good to continue with flush and current wal is not closing, false + * otherwise. */ - Long startCacheFlush(final byte[] encodedRegionName); + boolean startCacheFlush(final byte[] encodedRegionName); /** * Complete the cache flush. diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java index fb97f22..e8c6723 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogFactory.java @@ -52,15 +52,15 @@ public class HLogFactory { public static HLog createHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf, final List listeners, - final String prefix) throws IOException { - return new FSHLog(fs, root, logName, conf, listeners, prefix); + final String prefix, HLogServices hlogServices) throws IOException { + return new FSHLog(fs, root, logName, conf, listeners, prefix, hlogServices); } public static HLog createMetaHLog(final FileSystem fs, final Path root, final String logName, final Configuration conf, final List listeners, - final String prefix) throws IOException { + final String prefix, HLogServices hlogServices) throws IOException { return new FSHLog(fs, root, logName, HConstants.HREGION_OLDLOGDIR_NAME, - conf, listeners, false, prefix, true); + conf, listeners, false, prefix, true, hlogServices); } /* diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogServices.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogServices.java new file mode 100644 index 0000000..ddd646f --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogServices.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A service interface providing services to HLog interface. This should be implemented by HLog + * clients, for example HRegionServer, to provide services such as region:sequenceIds mapping. + */ + +@InterfaceAudience.Private +public interface HLogServices { + /** + * Gets the region's current sequenceIds. It is called while closing an existing wal file. + * @return regionSequenceId + */ + Map obtainRegionsSequenceId(); +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java index 95fbda9..6ad8fee 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLogUtil.java @@ -255,12 +255,18 @@ public class HLogUtil { * This provides info to the HMaster to allow it to recover the compaction if * this regionserver dies in the middle (This part is not yet implemented). It also prevents * the compaction from finishing if this regionserver has already lost its lease on the log. + * @param log + * @param htd + * @param info + * @param c + * @param regionSequenceId + * @throws IOException */ public static void writeCompactionMarker(HLog log, HTableDescriptor htd, HRegionInfo info, - final CompactionDescriptor c) throws IOException { + final CompactionDescriptor c, long regionSequenceId) throws IOException { WALEdit e = WALEdit.createCompaction(c); log.append(info, c.getTableName().toByteArray(), e, - EnvironmentEdgeManager.currentTimeMillis(), htd, false); + EnvironmentEdgeManager.currentTimeMillis(), htd, false, regionSequenceId); if (LOG.isTraceEnabled()) { LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c)); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java index 7a25881..8ec43d3 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java @@ -186,7 +186,7 @@ public class TestWALObserver { // it's where WAL write cp should occur. long now = EnvironmentEdgeManager.currentTimeMillis(); - log.append(hri, hri.getTableName(), edit, now, htd); + log.append(hri, hri.getTableName(), edit, now, htd, 0); // the edit shall have been change now by the coprocessor. foundFamily0 = false; @@ -242,14 +242,15 @@ public class TestWALObserver { long now = EnvironmentEdgeManager.currentTimeMillis(); // addFamilyMapToWALEdit(p.getFamilyMap(), edit); final int countPerFamily = 1000; + long regionSequenceId = 1l; // for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) { for (HColumnDescriptor hcd : htd.getFamilies()) { // addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, // EnvironmentEdgeManager.getDelegate(), wal); addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily, - EnvironmentEdgeManager.getDelegate(), wal, htd); + EnvironmentEdgeManager.getDelegate(), wal, htd, ++regionSequenceId); } - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, ++regionSequenceId); // sync to fs. wal.sync(); @@ -369,7 +370,7 @@ public class TestWALObserver { private void addWALEdits(final byte[] tableName, final HRegionInfo hri, final byte[] rowName, final byte[] family, final int count, - EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd) + EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, long regionSequenceId) throws IOException { String familyStr = Bytes.toString(family); for (int j = 0; j < count; j++) { @@ -378,7 +379,7 @@ public class TestWALObserver { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee .currentTimeMillis(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); + wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, regionSequenceId+j); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java index 485b22c..b3657fe 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHLogRecordReader.java @@ -21,7 +21,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.mapreduce.HLogInputFormat.HLogRecordReader; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; +import org.apache.hadoop.hbase.regionserver.wal.HLogServices; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.InputSplit; @@ -47,6 +50,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** * JUnit tests for the HLogRecordReader @@ -110,14 +116,25 @@ public class TestHLogRecordReader { */ @Test public void testPartialRead() throws Exception { - HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf); + HLogServices mockLogService = Mockito.mock(HLogServices.class); + final Map regionsSequenceIds = new HashMap(); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); + HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf, null, null, mockLogService); long ts = System.currentTimeMillis(); + long regionSequenceId = 1l; WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), ts, value)); - log.append(info, tableName, edit, ts, htd); + log.append(info, tableName, edit, ts, htd, ++regionSequenceId); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), ts+1, value)); - log.append(info, tableName, edit, ts+1, htd); + log.append(info, tableName, edit, ts+1, htd, ++regionSequenceId); + regionsSequenceIds.put(info.getEncodedNameAsBytes(), regionSequenceId); log.rollWriter(); Thread.sleep(1); @@ -125,10 +142,11 @@ public class TestHLogRecordReader { edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("3"), ts1+1, value)); - log.append(info, tableName, edit, ts1+1, htd); + log.append(info, tableName, edit, ts1+1, htd, ++regionSequenceId); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("4"), ts1+2, value)); - log.append(info, tableName, edit, ts1+2, htd); + log.append(info, tableName, edit, ts1+2, htd, ++regionSequenceId); + regionsSequenceIds.put(info.getEncodedNameAsBytes(), regionSequenceId); log.close(); HLogInputFormat input = new HLogInputFormat(); @@ -158,23 +176,35 @@ public class TestHLogRecordReader { */ @Test public void testHLogRecordReader() throws Exception { - HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf); + HLogServices mockLogService = Mockito.mock(HLogServices.class); + final Map regionsSequenceIds = new HashMap(); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); + HLog log = HLogFactory.createHLog(fs, hbaseDir, logName, conf, null, null, mockLogService); byte [] value = Bytes.toBytes("value"); WALEdit edit = new WALEdit(); + long regionSequenceId = 1l; edit.add(new KeyValue(rowName, family, Bytes.toBytes("1"), System.currentTimeMillis(), value)); log.append(info, tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, ++regionSequenceId); Thread.sleep(1); // make sure 2nd log gets a later timestamp long secondTs = System.currentTimeMillis(); + regionsSequenceIds.put(info.getEncodedNameAsBytes(), regionSequenceId); log.rollWriter(); edit = new WALEdit(); edit.add(new KeyValue(rowName, family, Bytes.toBytes("2"), System.currentTimeMillis(), value)); log.append(info, tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, ++regionSequenceId); + regionsSequenceIds.put(info.getEncodedNameAsBytes(), regionSequenceId); log.close(); long thirdTs = System.currentTimeMillis(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java index f704ddb..8cd8c7f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestDistributedLogSplitting.java @@ -220,7 +220,6 @@ public class TestDistributedLogSplitting { public void testLogReplayWithNonMetaRSDown() throws Exception { LOG.info("testLogReplayWithNonMetaRSDown"); Configuration curConf = HBaseConfiguration.create(); - curConf.setLong("hbase.regionserver.hlog.blocksize", 100*1024); curConf.setBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, true); startCluster(NUM_RS, curConf); final int NUM_REGIONS_TO_CREATE = 40; @@ -1124,6 +1123,7 @@ public class TestDistributedLogSplitting { } int n = hris.size(); int[] counts = new int[n]; + long regionSequenceId = 1l; if (n > 0) { for (int i = 0; i < num_edits; i += 1) { WALEdit e = new WALEdit(); @@ -1139,7 +1139,7 @@ public class TestDistributedLogSplitting { byte[] family = Bytes.toBytes(fname); byte[] qualifier = Bytes.toBytes("c" + Integer.toString(i)); e.add(new KeyValue(row, family, qualifier, System.currentTimeMillis(), value)); - log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd); + log.append(curRegionInfo, table, e, System.currentTimeMillis(), htd, ++regionSequenceId); counts[i % n] += 1; } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 2da8b05..1cbf2b7 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -444,7 +444,8 @@ public class TestHRegion extends HBaseTestCase { region.getRegionFileSystem().getStoreDir(Bytes.toString(family))); HLogUtil.writeCompactionMarker(region.getLog(), this.region.getTableDesc(), - this.region.getRegionInfo(), compactionDescriptor); + this.region.getRegionInfo(), compactionDescriptor, + this.region.incrementAndGetRegionSequenceId()); Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); @@ -3926,7 +3927,7 @@ public class TestHRegion extends HBaseTestCase { //verify append called or not verify(log, expectAppend ? times(1) : never()) .appendNoSync((HRegionInfo)any(), eq(tableName), - (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any()); + (WALEdit)any(), (UUID)any(), anyLong(), (HTableDescriptor)any(), anyLong()); //verify sync called or not if (expectSync || expectSyncFromLogSyncer) { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java index 579d249..50b9966 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/HLogPerformanceEvaluation.java @@ -102,9 +102,11 @@ public final class HLogPerformanceEvaluation extends Configured implements Tool HRegionInfo hri = region.getRegionInfo(); if (this.noSync) { hlog.appendNoSync(hri, hri.getTableName(), walEdit, - HConstants.DEFAULT_CLUSTER_ID, now, htd); + HConstants.DEFAULT_CLUSTER_ID, now, htd, + region.incrementAndGetRegionSequenceId()); } else { - hlog.append(hri, hri.getTableName(), walEdit, now, htd); + hlog.append(hri, hri.getTableName(), walEdit, now, htd, + region.incrementAndGetRegionSequenceId()); } } long totalTime = (System.currentTimeMillis() - startTime); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java index 6ecb95d..3196e25 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLog.java @@ -19,13 +19,19 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.*; +import static org.apache.hadoop.hbase.regionserver.wal.FSHLog.LOG_FILES_COMPARATOR; import java.io.IOException; import java.lang.reflect.Method; import java.net.BindException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.NavigableSet; import java.util.TreeMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,8 +62,12 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** JUnit test case for HLog */ @Category(LargeTests.class) @@ -88,7 +98,6 @@ public class TestHLog { for (FileStatus dir : entries) { fs.delete(dir.getPath(), true); } - } @After @@ -136,11 +145,15 @@ public class TestHLog { } /** - * Test that with three concurrent threads we still write edits in sequence - * edit id order. + * Test that with three concurrent threads we still write edits in sequence edit id order. + *

+ * With the per regionsequenceId, the edits in a wal will not be in a strict ascending order. + * Thereby, this test is disabled. + *

+ * @throws Exception * @throws Exception */ - @Test + @Test @Ignore public void testMaintainOrderWithConcurrentWrites() throws Exception { // Run the HPE tool with three threads writing 3000 edits each concurrently. // When done, verify that all edits were written and that the order in the @@ -157,13 +170,22 @@ public class TestHLog { */ @Test public void testSplit() throws IOException { - + HLogServices mockLogService = Mockito.mock(HLogServices.class); + final Map regionsSequenceIds = new HashMap(); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); final byte [] tableName = Bytes.toBytes(getName()); final byte [] rowName = tableName; Path logdir = new Path(hbaseDir, HConstants.HREGION_LOGDIR_NAME); HLog log = HLogFactory.createHLog(fs, hbaseDir, - HConstants.HREGION_LOGDIR_NAME, conf); + HConstants.HREGION_LOGDIR_NAME, conf, null, null, mockLogService); final int howmany = 3; + long regionSequenceId = 1l; HRegionInfo[] infos = new HRegionInfo[3]; Path tabledir = new Path(hbaseDir, getName()); fs.mkdirs(tabledir); @@ -172,6 +194,9 @@ public class TestHLog { Bytes.toBytes("" + i), Bytes.toBytes("" + (i+1)), false); fs.mkdirs(new Path(tabledir, infos[i].getEncodedName())); LOG.info("allo " + new Path(tabledir, infos[i].getEncodedName()).toString()); + // let the rollwriter doesn't archive the logs. + regionsSequenceIds.put(infos[i].getEncodedNameAsBytes(), Long.MAX_VALUE); + } HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); @@ -190,7 +215,7 @@ public class TestHLog { System.currentTimeMillis(), column)); LOG.info("Region " + i + ": " + edit); log.append(infos[i], tableName, edit, - System.currentTimeMillis(), htd); + System.currentTimeMillis(), htd, ++regionSequenceId); } } log.rollWriter(); @@ -213,6 +238,15 @@ public class TestHLog { */ @Test public void Broken_testSync() throws Exception { + HLogServices mockLogService = Mockito.mock(HLogServices.class); + final Map regionsSequenceIds = new HashMap(); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); byte [] bytes = Bytes.toBytes(getName()); // First verify that using streams all works. Path p = new Path(dir, getName() + ".fsdos"); @@ -238,11 +272,11 @@ public class TestHLog { out.close(); in.close(); - HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf); + HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", conf, null, null, mockLogService); final int total = 20; HLog.Reader reader = null; - + long regionSequenceId = 1l; try { HRegionInfo info = new HRegionInfo(bytes, null,null, false); @@ -252,7 +286,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); - wal.append(info, bytes, kvs, System.currentTimeMillis(), htd); + wal.append(info, bytes, kvs, System.currentTimeMillis(), htd, ++regionSequenceId); } // Now call sync and try reading. Opening a Reader before you sync just // gives you EOFE. @@ -270,7 +304,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, bytes)); - wal.append(info, bytes, kvs, System.currentTimeMillis(), htd); + wal.append(info, bytes, kvs, System.currentTimeMillis(), htd, ++regionSequenceId); } reader = HLogFactory.createReader(fs, walPath, conf); count = 0; @@ -289,7 +323,7 @@ public class TestHLog { for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), bytes, value)); - wal.append(info, bytes, kvs, System.currentTimeMillis(), htd); + wal.append(info, bytes, kvs, System.currentTimeMillis(), htd, ++regionSequenceId); } // Now I should have written out lots of blocks. Sync then read. wal.sync(); @@ -388,14 +422,14 @@ public class TestHLog { HLog wal = HLogFactory.createHLog(fs, dir, "hlogdir", "hlogdir_archive", conf); final int total = 20; - + long regionSequenceId = 1l; HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(tableName)); for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); - wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + wal.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, ++regionSequenceId); } // Now call sync to send the data to HDFS datanodes wal.sync(); @@ -523,7 +557,7 @@ public class TestHLog { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); - log.append(info, tableName, cols, System.currentTimeMillis(), htd); + log.append(info, tableName, cols, System.currentTimeMillis(), htd, 0); log.startCacheFlush(info.getEncodedNameAsBytes()); log.completeCacheFlush(info.getEncodedNameAsBytes()); log.close(); @@ -579,7 +613,7 @@ public class TestHLog { HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("column")); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, 0); log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); log.close(); @@ -633,7 +667,7 @@ public class TestHLog { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(i)), timestamp, new byte[]{(byte) (i + '0')})); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, 0); } assertEquals(COL_COUNT, visitor.increments); log.unregisterWALActionsListener(visitor); @@ -641,7 +675,7 @@ public class TestHLog { cols.add(new KeyValue(row, Bytes.toBytes("column"), Bytes.toBytes(Integer.toString(11)), timestamp, new byte[]{(byte) (11 + '0')})); - log.append(hri, tableName, cols, System.currentTimeMillis(), htd); + log.append(hri, tableName, cols, System.currentTimeMillis(), htd, 0); assertEquals(COL_COUNT, visitor.increments); } finally { if (log != null) log.closeAndDelete(); @@ -653,9 +687,18 @@ public class TestHLog { LOG.info("testLogCleaning"); final byte [] tableName = Bytes.toBytes("testLogCleaning"); final byte [] tableName2 = Bytes.toBytes("testLogCleaning2"); + HLogServices mockLogService = Mockito.mock(HLogServices.class); + final Map regionsSequenceIds = new HashMap(); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); HLog log = HLogFactory.createHLog(fs, hbaseDir, - getName(), conf); + getName(), conf, null, null, mockLogService); try { HRegionInfo hri = new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); @@ -664,37 +707,48 @@ public class TestHLog { // Add a single edit and make sure that rolling won't remove the file // Before HBASE-3198 it used to delete it - addEdits(log, hri, tableName, 1); + long regionSequenceId1 = 1l, regionSequenceId2 = 1l; + addEdits(log, hri, tableName, 1, regionSequenceId1); + regionsSequenceIds.put(hri.getEncodedNameAsBytes(), regionSequenceId1); log.rollWriter(); assertEquals(1, ((FSHLog) log).getNumLogFiles()); // See if there's anything wrong with more than 1 edit - addEdits(log, hri, tableName, 2); + addEdits(log, hri, tableName, 2, ++regionSequenceId1); + // above statement has increased the regionSequenceId by 2; + regionSequenceId1 += 1; + regionsSequenceIds.put(hri.getEncodedNameAsBytes(), regionSequenceId1); + log.rollWriter(); assertEquals(2, ((FSHLog) log).getNumLogFiles()); // Now mix edits from 2 regions, still no flushing - addEdits(log, hri, tableName, 1); - addEdits(log, hri2, tableName2, 1); - addEdits(log, hri, tableName, 1); - addEdits(log, hri2, tableName2, 1); + addEdits(log, hri, tableName, 1, ++regionSequenceId1); + addEdits(log, hri2, tableName2, 1, ++regionSequenceId2); + addEdits(log, hri, tableName, 1, ++regionSequenceId1); + addEdits(log, hri2, tableName2, 1, ++regionSequenceId2); + regionsSequenceIds.put(hri.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); log.rollWriter(); assertEquals(3, ((FSHLog) log).getNumLogFiles()); // Flush the first region, we expect to see the first two files getting // archived. We need to append something or writer won't be rolled. - addEdits(log, hri2, tableName2, 1); + addEdits(log, hri2, tableName2, 1, ++regionSequenceId2); log.startCacheFlush(hri.getEncodedNameAsBytes()); log.completeCacheFlush(hri.getEncodedNameAsBytes()); + regionsSequenceIds.put(hri.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); log.rollWriter(); assertEquals(2, ((FSHLog) log).getNumLogFiles()); // Flush the second region, which removes all the remaining output files // since the oldest was completely flushed and the two others only contain // flush information - addEdits(log, hri2, tableName2, 1); + addEdits(log, hri2, tableName2, 1, ++regionSequenceId2); log.startCacheFlush(hri2.getEncodedNameAsBytes()); log.completeCacheFlush(hri2.getEncodedNameAsBytes()); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); log.rollWriter(); assertEquals(0, ((FSHLog) log).getNumLogFiles()); } finally { @@ -750,7 +804,7 @@ public class TestHLog { } private void addEdits(HLog log, HRegionInfo hri, byte [] tableName, - int times) throws IOException { + int times, long sequenceId) throws IOException { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor("row")); @@ -759,7 +813,7 @@ public class TestHLog { long timestamp = System.currentTimeMillis(); WALEdit cols = new WALEdit(); cols.add(new KeyValue(row, row, row, timestamp, row)); - log.append(hri, tableName, cols, timestamp, htd); + log.append(hri, tableName, cols, timestamp, htd, sequenceId+i); } } @@ -830,6 +884,251 @@ public class TestHLog { } } + @Test + public void testLogFileComparator() { + Object o = new Object(); + ConcurrentSkipListMap logs = + new ConcurrentSkipListMap(LOG_FILES_COMPARATOR); + + // test some invalid paths. + Path p1 = new Path("a"); + Path p2 = new Path("b"); + // invalid path should go to default compare. + assertEquals(p1.compareTo(p2), LOG_FILES_COMPARATOR.compare(p1, p2)); + // make sure the invalid paths are added to the logs. + logs.put(p1, o); + logs.put(p2, o); + assertEquals(2, logs.size()); + logs.clear(); + // check some valid paths. + p1 = new Path("hdfs://localhost:9090/hbase/regionserver.00000001"); + p2 = new Path("hdfs://localhost:9090/hbase/regionserver.00000002"); + assertTrue(LOG_FILES_COMPARATOR.compare(p1, p2) < 0); + + p1 = new Path("hdfs://localhost:9090/hbase/regionserver.0000001"); + p2 = new Path("hdfs://localhost:9090/hbase/regionserver.00000002"); + // the comparison is one the number part. + assertTrue(LOG_FILES_COMPARATOR.compare(p1, p2) < 0); + logs.put(p1, o); + logs.put(p2, o); + NavigableSet keySet = logs.keySet(); + assertTrue(keySet.pollFirst().equals(p1)); + assertTrue(keySet.pollFirst().equals(p2)); + + // checks in case meta/regular log files are mixed (these are error conditions + // from implementation point of view). + p1 = new Path("hdfs://localhost:9090/hbase/regionserver.0000001"); + p2 = new Path("hdfs://localhost:9090/hbase/regionserver.0000001.meta"); + assertEquals(0, LOG_FILES_COMPARATOR.compare(p1, p2)); + + p1 = new Path("hdfs://localhost:9090/hbase/regionserver.0000001"); + p2 = new Path("hdfs://localhost:9090/hbase/regionserver.0000002.meta"); + assertTrue(LOG_FILES_COMPARATOR.compare(p1, p2) < 0); + } + + /** + * Tests the new archiving scheme. Here is what the test does: + *

+ * Add an edit and roll the wal. Check the old wal isn't archived. Do it 3-4 times and see if the + * wal is arcived or not. Flush the region and see that the wal should be rolled. Use two tables + * such that wal archiving is not done even if the wal is rolled many times and region of table + * t1 is flushed, because there is one region of table t2, which is not flushed. Flush that + * region and see that wals are archived. + * @throws IOException + */ + @Test + public void testWALArchiving() throws IOException { + LOG.debug("testWALArchiving"); + final byte[] table1 = Bytes.toBytes("t1"); + final byte[] table2 = Bytes.toBytes("t2"); + HLogServices mockLogService = Mockito.mock(HLogServices.class); + final Map regionsSequenceIds = new HashMap(); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); + HLog log = HLogFactory.createHLog(fs, hbaseDir, getName(), conf, null, null, mockLogService); + try { + // there is only one wal at this time (this method doesn't count the current wal). + assertEquals(0, ((FSHLog) log).getNumLogFiles()); + HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + // don't split the regions as we want to make this test deterministic. + hri1.setSplit(false); + hri2.setSplit(false); + // add a single edit and roll the wal. Make sure that rolling won't remove the file. + long regionSequenceId1 = 1l, regionSequenceId2 = 1l; + addEdits(log, hri1, table1, 1, ++regionSequenceId1); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + log.rollWriter(); + assertEquals(1, ((FSHLog) log).getNumLogFiles()); + + // after making the map above, sequeceId1 should be incremented for future updates. + addEdits(log, hri1, table1, 2, ++regionSequenceId1); + regionSequenceId1 += 1; + // now roll the wal again and see that wal#1 is not archived. + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + + log.rollWriter(); + assertEquals(2, ((FSHLog) log).getNumLogFiles()); + // now. add an entry to table1, flush the region, and roll the log. + // all the logs should be archived. + addEdits(log, hri1, table1, 1, ++regionSequenceId1); + log.startCacheFlush(hri1.getEncodedNameAsBytes()); + log.completeCacheFlush(hri1.getEncodedNameAsBytes()); + // all the wals should have gone by now. + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + + log.rollWriter(); + assertEquals(0, ((FSHLog) log).getNumLogFiles()); + // now, add an entry in table2 and repeat the above process for table1. + addEdits(log, hri2, table2, 1, ++regionSequenceId2); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + + log.rollWriter(); + // wal will not be archived as there is an entry of table2. + // Map will be like: log1: hri2. + + assertEquals(1, ((FSHLog) log).getNumLogFiles()); + // repeat the process for table1. + addEdits(log, hri1, table1, 1, ++regionSequenceId1); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + + log.rollWriter(); + // log map would be like: + // log1: hri2 (unflushed); log2 : hri1 (unflushed), hri2( unflushed). + assertEquals(2, ((FSHLog) log).getNumLogFiles()); + + // add two entries in hri1. + addEdits(log, hri1, table1, 2, ++regionSequenceId1); + regionSequenceId1 += 1; + // now roll the wal again and see that wal#1 is not archived. + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + + log.rollWriter(); + // log map would be like: + // log1: hri2 (unflushed); log2: hri1(unflushed), hri2(unflushed); log3: hri1(unflushed), + // hri2(unflushed) + assertEquals(3, ((FSHLog) log).getNumLogFiles()); + // now, flush the region, and roll the log. + // the logs should not be archived as there is unflushed table2. + log.startCacheFlush(hri1.getEncodedNameAsBytes()); + log.completeCacheFlush(hri1.getEncodedNameAsBytes()); + log.rollWriter(); + // the above call shouldn't have rolled the log as there is no entry in it. + // log1: hri2 (unflushed); log2: hri1(flushed), hri2(unflushed); log3: hri1(flushed), + // hri2(unflushed) + assertEquals(3, ((FSHLog) log).getNumLogFiles()); + + log.startCacheFlush(hri2.getEncodedNameAsBytes()); + log.completeCacheFlush(hri2.getEncodedNameAsBytes()); + addEdits(log, hri1, table1, 1, ++regionSequenceId1); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + log.rollWriter(); + // log1: hri2 (flushed); log2: hri1(flushed), hri2(flushed); log3: hri1(flushed), + // hri2(flushed) + // log4: hri1 (unflushed), hri2 (flushed). + assertEquals(1, ((FSHLog) log).getNumLogFiles()); + // addEdits(log, hri1, table1, 1, sequenceId1++); + + } finally { + if (log != null) log.closeAndDelete(); + } + } + + /** + * Tests that when we roll the wal, we pass the correct regions to flush. This test creates three + * tables. Add data into them. See the regions returned to flush, when we do a log roll. + */ + @Test + public void testWALRolling() throws IOException { + LOG.debug("testWALRolling"); + final byte[] table1 = Bytes.toBytes("t1"); + final byte[] table2 = Bytes.toBytes("t2"); + final byte[] table3 = Bytes.toBytes("t3"); + byte[][] regionsToFlush = null; + final Map regionsSequenceIds = new HashMap(); + Configuration walRollingConf = HBaseConfiguration.create(conf); + walRollingConf.setInt("hbase.regionserver.maxlogs", 2); + HLog log = null; + try { + HLogServices mockLogService = Mockito.mock(HLogServices.class); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); + log = HLogFactory.createHLog(fs, hbaseDir, getName(), walRollingConf, null, null, + mockLogService); + assertEquals(0, ((FSHLog) log).getNumLogFiles()); + HRegionInfo hri1 = new HRegionInfo(table1, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri2 = new HRegionInfo(table2, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + HRegionInfo hri3 = new HRegionInfo(table3, HConstants.EMPTY_START_ROW, + HConstants.EMPTY_END_ROW); + hri1.setSplit(false); + hri2.setSplit(false); + hri3.setSplit(false); + // add a single edit and roll the wal. Make sure that rolling won't remove the file. + // starting with 1 as it is the minimum sequenceId of a region (whenever it opens). + long regionSequenceId1 = 1l, regionSequenceId2 = 1l, regionSequenceId3 = 1l; + addEdits(log, hri1, table1, 1, ++regionSequenceId1); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + regionsSequenceIds.put(hri3.getEncodedNameAsBytes(), regionSequenceId3); + regionsToFlush = log.rollWriter(); + assertTrue(regionsToFlush == null); + assertEquals(1, ((FSHLog) log).getNumLogFiles()); + // add entries in table2 and roll the log. + addEdits(log, hri2, table2, 1, ++regionSequenceId2); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + regionsSequenceIds.put(hri3.getEncodedNameAsBytes(), regionSequenceId3); + regionsToFlush = log.rollWriter(); + assertTrue(regionsToFlush == null); + assertEquals(2, ((FSHLog) log).getNumLogFiles()); + // now add entry in table3 and see roll the log. The log roller method should + // give table1 as the region to flush. + addEdits(log, hri3, table3, 1, ++regionSequenceId3); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + regionsSequenceIds.put(hri3.getEncodedNameAsBytes(), regionSequenceId3); + regionsToFlush = log.rollWriter(); + assertTrue(regionsToFlush != null && regionsToFlush.length == 1); + assertTrue(hri1.getEncodedName().equals(Bytes.toString(regionsToFlush[0]))); + assertEquals(3, ((FSHLog) log).getNumLogFiles()); + // flush regions of table1, table2; add edits in table3, roll the log + // and confirm only 2 log files are there. + log.startCacheFlush(hri1.getEncodedNameAsBytes()); + log.completeCacheFlush(hri1.getEncodedNameAsBytes()); + log.startCacheFlush(hri2.getEncodedNameAsBytes()); + log.completeCacheFlush(hri2.getEncodedNameAsBytes()); + addEdits(log, hri3, table3, 1, ++regionSequenceId3); + regionsSequenceIds.put(hri1.getEncodedNameAsBytes(), regionSequenceId1); + regionsSequenceIds.put(hri2.getEncodedNameAsBytes(), regionSequenceId2); + regionsSequenceIds.put(hri3.getEncodedNameAsBytes(), regionSequenceId3); + regionsToFlush = log.rollWriter(); + // two logs should be archived. + assertEquals(2, ((FSHLog) log).getNumLogFiles()); + } finally { + if (log != null) log.close(); + } + } + /** * Reads the WAL with and without WALTrailer. * @throws IOException diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java index ef7f85d..f543e35 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestHLogSplit.java @@ -1087,12 +1087,14 @@ public class TestHLogSplit { log = HLogFactory.createHLog(fs, HBASEDIR, logName, conf); final int total = 20; + long regionSequenceId = 1l; for (int i = 0; i < total; i++) { WALEdit kvs = new WALEdit(); kvs.add(new KeyValue(Bytes.toBytes(i), tableName, tableName)); HTableDescriptor htd = new HTableDescriptor(tableName); htd.addFamily(new HColumnDescriptor("column")); - log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd); + log.append(regioninfo, tableName, kvs, System.currentTimeMillis(), htd, + ++regionSequenceId); } // Send the data to HDFS datanodes and close the HDFS writer log.sync(); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java index eba84a1..3658e25 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollingNoCluster.java @@ -110,6 +110,7 @@ public class TestLogRollingNoCluster { @Override public void run() { this.log.info(getName() +" started"); + long regionSequenceId = 1l; try { for (int i = 0; i < this.count; i++) { long now = System.currentTimeMillis(); @@ -123,7 +124,7 @@ public class TestLogRollingNoCluster { this.wal.append(HRegionInfo.FIRST_META_REGIONINFO, HTableDescriptor.META_TABLEDESC.getName(), - edit, now, HTableDescriptor.META_TABLEDESC); + edit, now, HTableDescriptor.META_TABLEDESC, ++regionSequenceId); } String msg = getName() + " finished"; if (isException()) diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java index 86047ad..c09d969 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALActionsListener.java @@ -88,7 +88,7 @@ public class TestWALActionsListener { list.add(observer); DummyWALActionsListener laterobserver = new DummyWALActionsListener(); HLog hlog = HLogFactory.createHLog(fs, TEST_UTIL.getDataTestDir(), logName, - conf, list, null); + conf, list, null, null); HRegionInfo hri = new HRegionInfo(SOME_BYTES, SOME_BYTES, SOME_BYTES, false); @@ -100,7 +100,7 @@ public class TestWALActionsListener { HTableDescriptor htd = new HTableDescriptor(); htd.addFamily(new HColumnDescriptor(b)); - hlog.append(hri, b, edit, 0, htd); + hlog.append(hri, b, edit, 0, htd, i+1); if (i == 10) { hlog.registerWALActionsListener(laterobserver); } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java index 8bc4a62..09017e4 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java @@ -266,32 +266,32 @@ public class TestWALReplay { HLog wal1 = createWAL(this.conf); // Add 1k to each family. final int countPerFamily = 1000; + long sequenceId = 1l; for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, ee, - wal1, htd); + wal1, htd, sequenceId); + sequenceId+=countPerFamily;// increasing sequence Id counter by number of appends. } wal1.close(); runWALSplit(this.conf); HLog wal2 = createWAL(this.conf); // Up the sequenceid so that these edits are after the ones added above. - wal2.setSequenceNumber(wal1.getSequenceNumber()); // Add 1k to each family. for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal2, htd); + ee, wal2, htd, sequenceId); + sequenceId+=countPerFamily;// increasing sequence Id counter by number of appends. } wal2.close(); runWALSplit(this.conf); HLog wal3 = createWAL(this.conf); - wal3.setSequenceNumber(wal2.getSequenceNumber()); try { - long wal3SeqId = wal3.getSequenceNumber(); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal3); long seqid = region.getOpenSeqNum(); - assertTrue(seqid > wal3SeqId); - + // Inserted 6k entries; seqId would b 6001, and sequenceId would be 6001 too. + assertTrue("seeId: " + seqid +", sequenceId: "+sequenceId, seqid == sequenceId); // TODO: Scan all. region.close(); } finally { @@ -387,8 +387,6 @@ public class TestWALReplay { HLog wal = createWAL(this.conf); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); long seqid = region.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal.setSequenceNumber(seqid); boolean first = true; for (HColumnDescriptor hcd: htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); @@ -412,8 +410,6 @@ public class TestWALReplay { HLog wal2 = createWAL(this.conf); HRegion region2 = HRegion.openHRegion(conf, this.fs, hbaseRootDir, hri, htd, wal2); long seqid2 = region2.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal2.setSequenceNumber(seqid2); assertTrue(seqid + result.size() < seqid2); final Result result1b = region2.get(g); assertEquals(result.size(), result1b.size()); @@ -450,8 +446,6 @@ public class TestWALReplay { } }; long seqid3 = region3.initialize(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal3.setSequenceNumber(seqid3); Result result3 = region3.get(g); // Assert that count of cells is same as before crash. assertEquals(result2.size(), result3.size()); @@ -504,8 +498,6 @@ public class TestWALReplay { HLog wal = createWAL(this.conf); HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); long seqid = region.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal.setSequenceNumber(seqid); for (HColumnDescriptor hcd: htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); } @@ -539,8 +531,6 @@ public class TestWALReplay { HLog wal2 = createWAL(this.conf); HRegion region2 = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal2); long seqid2 = region2.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all regions. - wal2.setSequenceNumber(seqid2); assertTrue(seqid + result.size() < seqid2); final Result result1b = region2.get(g); @@ -597,9 +587,6 @@ public class TestWALReplay { CustomStoreFlusher.class.getName()); HRegion region = new HRegion(basedir, wal, this.fs, customConf, hri, htd, rsServices); long seqid = region.initialize(); - // HRegionServer usually does this. It knows the largest seqid across all - // regions. - wal.setSequenceNumber(seqid); int writtenRowCount = 10; List families = new ArrayList( @@ -654,9 +641,6 @@ public class TestWALReplay { HRegion region2 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd, rsServices); long seqid2 = region2.initialize(); - // HRegionServer usually does this. It knows the largest seqid across all - // regions. - wal2.setSequenceNumber(seqid2); scanner = region2.getScanner(new Scan()); assertEquals(writtenRowCount, getScannedCount(scanner)); @@ -699,9 +683,10 @@ public class TestWALReplay { // Add 1k to each family. final int countPerFamily = 1000; + long sequenceId = 1l; for (HColumnDescriptor hcd: htd.getFamilies()) { addWALEdits(tableName, hri, rowName, hcd.getName(), countPerFamily, - ee, wal, htd); + ee, wal, htd, sequenceId); } // Add a cache flush, shouldn't have any effect @@ -713,15 +698,15 @@ public class TestWALReplay { long now = ee.currentTimeMillis(); edit.add(new KeyValue(rowName, Bytes.toBytes("another family"), rowName, now, rowName)); - wal.append(hri, tableName, edit, now, htd); + wal.append(hri, tableName, edit, now, htd, sequenceId++); // Delete the c family to verify deletes make it over. edit = new WALEdit(); now = ee.currentTimeMillis(); edit.add(new KeyValue(rowName, Bytes.toBytes("c"), null, now, KeyValue.Type.DeleteFamily)); - wal.append(hri, tableName, edit, now, htd); - + wal.append(hri, tableName, edit, now, htd, sequenceId); + final long seqIdAfterAppends = sequenceId; // Sync. wal.sync(); // Set down maximum recovery so we dfsclient doesn't linger retrying something @@ -757,7 +742,7 @@ public class TestWALReplay { long seqid = region.initialize(); // We flushed during init. assertTrue("Flushcount=" + flushcount.get(), flushcount.get() > 0); - assertTrue(seqid > wal.getSequenceNumber()); + assertTrue(seqid > seqIdAfterAppends); Get get = new Get(rowName); Result result = region.get(get); @@ -789,14 +774,11 @@ public class TestWALReplay { HRegion region = HRegion.openHRegion(this.conf, this.fs, hbaseRootDir, hri, htd, wal); long seqid = region.getOpenSeqNum(); - // HRegionServer usually does this. It knows the largest seqid across all - // regions. - wal.setSequenceNumber(seqid); for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), countPerFamily, this.ee, region, "x"); } // get the seq no after first set of entries. - long sequenceNumber = wal.getSequenceNumber(); + long sequenceNumber = region.getRegionSequenceId(); // Let us flush the region // But this time completeflushcache is not yet done @@ -804,7 +786,7 @@ public class TestWALReplay { for (HColumnDescriptor hcd : htd.getFamilies()) { addRegionEdits(rowName, hcd.getName(), 5, this.ee, region, "x"); } - long lastestSeqNumber = wal.getSequenceNumber(); + long lastestSeqNumber = region.getRegionSequenceId(); // get the current seq no wal.doCompleteCacheFlush = true; // allow complete cache flush with the previous seq number got after first @@ -880,7 +862,8 @@ public class TestWALReplay { private void addWALEdits (final byte [] tableName, final HRegionInfo hri, final byte [] rowName, final byte [] family, - final int count, EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd) + final int count, EnvironmentEdge ee, final HLog wal, final HTableDescriptor htd, + long regionSequenceId) throws IOException { String familyStr = Bytes.toString(family); for (int j = 0; j < count; j++) { @@ -889,7 +872,7 @@ public class TestWALReplay { WALEdit edit = new WALEdit(); edit.add(new KeyValue(rowName, family, qualifierBytes, ee.currentTimeMillis(), columnBytes)); - wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd); + wal.append(hri, tableName, edit, ee.currentTimeMillis(), htd, regionSequenceId + j); } } diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java index 670156e..343be8f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationHLogReaderManager.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.HLogServices; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.util.Bytes; @@ -43,13 +44,18 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import static org.junit.Assert.*; import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Category(MediumTests.class) public class TestReplicationHLogReaderManager { @@ -70,6 +76,8 @@ public class TestReplicationHLogReaderManager { private ReplicationHLogReaderManager logManager; private PathWatcher pathWatcher; + private long regionSequenceID = 1l; + final Map regionsSequenceIds = new HashMap(); @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -92,7 +100,16 @@ public class TestReplicationHLogReaderManager { List listeners = new ArrayList(); pathWatcher = new PathWatcher(); listeners.add(pathWatcher); - log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server"); + HLogServices mockLogService = Mockito.mock(HLogServices.class); + Mockito.when(mockLogService.obtainRegionsSequenceId()).thenAnswer( + new Answer>() { + @Override + public Map answer(InvocationOnMock invocation) throws Throwable { + return new HashMap(regionsSequenceIds); + } + }); + log = HLogFactory.createHLog(fs, hbaseDir, "test", conf, listeners, "some server", + mockLogService); } @After @@ -137,6 +154,7 @@ public class TestReplicationHLogReaderManager { logManager.closeReader(); oldPos = logManager.getPosition(); + this.regionsSequenceIds.put(info.getEncodedNameAsBytes(), this.regionSequenceID); log.rollWriter(); // We rolled but we still should see the end of the first log and not get data @@ -158,7 +176,7 @@ public class TestReplicationHLogReaderManager { } private void appendToLog() throws IOException { - log.append(info, tableName, getWALEdit(), System.currentTimeMillis(), htd); + log.append(info, tableName, getWALEdit(), System.currentTimeMillis(), htd, ++regionSequenceID); } private WALEdit getWALEdit() { diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java index f5c3c99..5b5237a 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java @@ -184,7 +184,7 @@ public class TestReplicationSourceManager { List listeners = new ArrayList(); listeners.add(replication); HLog hlog = HLogFactory.createHLog(fs, utility.getDataTestDir(), logName, - conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8")); + conf, listeners, URLEncoder.encode("regionserver:60020", "UTF8"), null); manager.init(); HTableDescriptor htd = new HTableDescriptor(); @@ -197,7 +197,7 @@ public class TestReplicationSourceManager { LOG.info(i); HLogKey key = new HLogKey(hri.getRegionName(), test, seq++, System.currentTimeMillis(), HConstants.DEFAULT_CLUSTER_ID); - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, 0); } // Simulate a rapid insert that's followed @@ -208,7 +208,7 @@ public class TestReplicationSourceManager { LOG.info(baseline + " and " + time); for (int i = 0; i < 3; i++) { - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, 0); } assertEquals(6, manager.getHLogs().get(slaveId).size()); @@ -218,7 +218,7 @@ public class TestReplicationSourceManager { manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false, false); - hlog.append(hri, test, edit, System.currentTimeMillis(), htd); + hlog.append(hri, test, edit, System.currentTimeMillis(), htd, 0); assertEquals(1, manager.getHLogs().size());