diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java index 78d6a44..65a0a11 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStoreTracker.java @@ -24,6 +24,8 @@ import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; @@ -37,9 +39,15 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; @InterfaceAudience.Private @InterfaceStability.Evolving public class ProcedureStoreTracker { + private static final Log LOG = LogFactory.getLog(ProcedureStoreTracker.class); + // Key is procedure id corresponding to first bit of the bitmap. private final TreeMap map = new TreeMap<>(); + public interface Visitor { + void visit(final long procId, final boolean isUpdated, boolean isDeleted); + } + /** * If true, do not remove bits corresponding to deleted procedures. Note that this can result * in huge bitmaps overtime. @@ -121,6 +129,18 @@ public class ProcedureStoreTracker { System.out.println(); } + public void visit(final Visitor visitor) { + long procId = getStart(); + for (int i = 0; i < updated.length; ++i) { + for (int j = 0; j < BITS_PER_WORD; ++j) { + boolean isUpdated = (updated[i] & (1L << j)) != 0; + boolean isDeleted = (deleted[i] & (1L << j)) != 0; + visitor.visit(procId, isUpdated, isDeleted); + procId++; + } + } + } + public BitSetNode(final long procId, final boolean partial) { start = alignDown(procId); @@ -275,6 +295,13 @@ public class ProcedureStoreTracker { return nonZeroIntersect; } + public void mergeDeletes(BitSetNode other) { + // TODO: this is not correct + for (int i = 0; i < deleted.length; ++i) { + deleted[i] = deleted[i] & other.deleted[i]; + } + } + /** * Convert to * {@link org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureStoreTracker.TrackerNode} @@ -545,6 +572,14 @@ public class ProcedureStoreTracker { resetUpdates(); } + public boolean isUpdated(long procId) { + final Map.Entry entry = map.floorEntry(procId); + if (entry != null && entry.getValue().contains(procId)) { + return entry.getValue().isUpdated(procId); + } + return false; + } + /** * If {@link #partial} is false, returns state from the bitmap. If no state is found for * {@code procId}, returns YES. @@ -583,6 +618,10 @@ public class ProcedureStoreTracker { } } + public boolean isPartial() { + return partial; + } + public void setPartialFlag(boolean isPartial) { if (this.partial && !isPartial) { for (Map.Entry entry : map.entrySet()) { @@ -720,6 +759,13 @@ public class ProcedureStoreTracker { entry.getValue().dump(); } } + + public void visit(final Visitor visitor) { + for (Map.Entry entry : map.entrySet()) { + entry.getValue().visit(visitor); + } + } + /** * Iterates over * {@link BitSetNode}s in this.map and subtracts with corresponding ones from {@code other} @@ -751,6 +797,15 @@ public class ProcedureStoreTracker { return nonZeroIntersect; } + public void mergeDeletes(final ProcedureStoreTracker other) { + // TODO: this is not correct! + for (Map.Entry currentEntry : map.entrySet()) { + BitSetNode currentBitSetNode = currentEntry.getValue(); + Map.Entry otherTrackerEntry = other.map.floorEntry(currentEntry.getKey()); + currentBitSetNode.mergeDeletes(otherTrackerEntry.getValue()); + } + } + // ======================================================================== // Convert to/from Protocol Buffer. // ======================================================================== diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 99e7a7e..ac0d5a4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -51,10 +51,6 @@ public class ProcedureWALFile implements Comparable { private long logSize; private long timestamp; - public ProcedureStoreTracker getTracker() { - return tracker; - } - private final ProcedureStoreTracker tracker = new ProcedureStoreTracker(); public ProcedureWALFile(final FileSystem fs, final FileStatus logStatus) { @@ -122,6 +118,10 @@ public class ProcedureWALFile implements Comparable { } } + public ProcedureStoreTracker getTracker() { + return tracker; + } + public FSDataInputStream getStream() { return stream; } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java index 775ec11..0a1cf0c 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormat.java @@ -23,8 +23,10 @@ import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Iterator; +import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALTr @InterfaceAudience.Private @InterfaceStability.Evolving public final class ProcedureWALFormat { + private static final Log LOG = LogFactory.getLog(ProcedureWALFormat.class); + static final byte LOG_TYPE_STREAM = 0; static final byte LOG_TYPE_COMPACTED = 1; static final byte LOG_TYPE_MAX_VALID = 1; @@ -70,13 +74,17 @@ public final class ProcedureWALFormat { private ProcedureWALFormat() {} - public static void load(final Iterator logs, + public static void load(final List logs, final ProcedureStoreTracker tracker, final Loader loader) throws IOException { ProcedureWALFormatReader reader = new ProcedureWALFormatReader(tracker); tracker.setKeepDeletes(true); try { - while (logs.hasNext()) { - ProcedureWALFile log = logs.next(); + for (int i = logs.size() - 2; i >= 0; --i) { + ProcedureWALFile log = logs.get(i); + if (log.getTracker().isEmpty()) { + // during the load the reader will set the "updated" procs + log.getTracker().setPartialFlag(true); + } log.open(); try { reader.read(log, loader); @@ -85,6 +93,18 @@ public final class ProcedureWALFormat { } } reader.finalize(loader); + + // rebuild the tracker deleted state, if needed + logs.get(0).getTracker().setPartialFlag(false); + for (int i = 1; i < logs.size() - 1; ++i) { + ProcedureStoreTracker rebuildTracker = logs.get(i).getTracker(); + if (rebuildTracker.isPartial()) { + rebuildTracker.setPartialFlag(false); + LOG.info("REBUILD TRACKER " + logs.get(i)); + rebuildTracker.mergeDeletes(logs.get(i - 1).getTracker()); + } + } + // The tracker is now updated with all the procedures read from the logs tracker.setPartialFlag(false); tracker.resetUpdates(); diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 8678c86..1945ef5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -102,6 +102,7 @@ public class ProcedureWALFormatReader { private final WalProcedureMap procedureMap = new WalProcedureMap(1024); //private long compactionLogId; + private ProcedureStoreTracker localTracker; private long maxProcId = 0; private final ProcedureStoreTracker tracker; @@ -114,6 +115,10 @@ public class ProcedureWALFormatReader { } public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException { + // in case the log wal was corrupted, rebuild the tracker + localTracker = log.getTracker().isPartial() ? log.getTracker() : null; + LOG.info("REBUILD " + log + " TRACKER " + (localTracker != null)); + FSDataInputStream stream = log.getStream(); try { boolean hasMore = true; @@ -153,6 +158,7 @@ public class ProcedureWALFormatReader { if (!localProcedureMap.isEmpty()) { log.setProcIds(localProcedureMap.getMinProcId(), localProcedureMap.getMaxProcId()); procedureMap.mergeTail(localProcedureMap); + //if (hasFastStartSupport) { // TODO: Some procedure may be already runnables (see readInitEntry()) // (we can also check the "update map" in the log trackers) @@ -178,17 +184,6 @@ public class ProcedureWALFormatReader { if (procIter != null) loader.handleCorrupted(procIter); } - private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { - maxProcId = Math.max(maxProcId, proc.getProcId()); - if (isRequired(proc.getProcId())) { - if (LOG.isTraceEnabled()) { - LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); - } - localProcedureMap.add(proc); - tracker.setDeleted(proc.getProcId(), false); - } - } - private void readInitEntry(final ProcedureWALEntry entry) throws IOException { assert entry.getProcedureCount() == 1 : "Expected only one procedure"; @@ -229,6 +224,20 @@ public class ProcedureWALFormatReader { } } + private void loadProcedure(final ProcedureWALEntry entry, final ProcedureProtos.Procedure proc) { + maxProcId = Math.max(maxProcId, proc.getProcId()); + if (isRequired(proc.getProcId())) { + if (LOG.isTraceEnabled()) { + LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); + } + localProcedureMap.add(proc); + tracker.setDeleted(proc.getProcId(), false); + } + if (localTracker != null) { + localTracker.setDeleted(proc.getProcId(), false); + } + } + private void deleteEntry(final long procId) { if (LOG.isTraceEnabled()) { LOG.trace("delete entry " + procId); @@ -237,6 +246,10 @@ public class ProcedureWALFormatReader { localProcedureMap.remove(procId); assert !procedureMap.contains(procId); tracker.setDeleted(procId, true); + + if (localTracker != null) { + localTracker.setDeleted(procId, true); + } } private boolean isDeleted(final long procId) { @@ -781,5 +794,16 @@ public class ProcedureWALFormatReader { private int getMapSlot(final long procId) { return (int)(Procedure.getProcIdHashCode(procId) % procedureMap.length); } + + private void rebuildTracker(final ProcedureStoreTracker tracker) { + assert tracker.isEmpty() : "expected empty tracker"; + for (int i = 0; i < procedureMap.length; ++i) { + Entry entry = procedureMap[i]; + while (entry != null) { + tracker.insert(entry.getProcId()); + entry = entry.hashNext; + } + } + } } } \ No newline at end of file diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 14b8efb..5d1bc38 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -349,10 +349,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Load the old logs - Iterator it = logs.descendingIterator(); - it.next(); // Skip the current log try { - ProcedureWALFormat.load(it, storeTracker, new ProcedureWALFormat.Loader() { + ProcedureWALFormat.load(logs, storeTracker, new ProcedureWALFormat.Loader() { @Override public void setMaxProcId(long maxProcId) { loader.setMaxProcId(maxProcId); @@ -1099,20 +1097,13 @@ public class WALProcedureStore extends ProcedureStoreBase { throw new IOException(msg, e); } - if (log.isCompacted()) { - try { - log.readTrailer(); - } catch (IOException e) { - LOG.warn("Unfinished compacted log: " + logFile, e); - log.removeFile(); - return null; - } - } try { log.readTracker(); } catch (IOException e) { LOG.warn("Unable to read tracker for " + log + " - " + e.getMessage()); } + + log.close(); return log; } } diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java index 2e2a038..ff0fc89 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/TestWALProcedureStore.java @@ -361,6 +361,56 @@ public class TestWALProcedureStore { } @Test + public void testCorruptedTrailersRebuild() throws Exception { + final Procedure[] procs = new Procedure[3]; + for (int i = 0; i < procs.length; ++i) { + procs[i] = new TestSequentialProcedure(); + procStore.insert(procs[i], null); + if ((i + 1) < procs.length) { + procStore.rollWriterForTesting(); + } + } + + // Stop the store + procStore.stop(false); + + // Remove 4 byte from the trailers + final FileStatus[] logs = fs.listStatus(logDir); + assertEquals(procs.length, logs.length); + for (int i = 0; i < logs.length; ++i) { + corruptLog(logs[i], 4); + } + + // Restart the store + final LoadCounter loader = new LoadCounter(); + storeRestart(loader); + assertEquals(procs.length, loader.getLoadedCount()); + assertEquals(0, loader.getCorruptedCount()); + + // Check the Trackers + final ArrayList walFiles = procStore.getActiveLogs(); + assertEquals(procs.length + 1, walFiles.size()); + for (int i = 0; i < walFiles.size() - 1; ++i) { + final ProcedureWALFile wal = walFiles.get(i); + LOG.info("checking wal: " + wal); + assertEquals(false, wal.getTracker().isEmpty()); + wal.getTracker().dump(); + // check if the entries up to the current proc are not deleted + for (int j = 0; j <= i; ++j) { + final long procId = procs[j].getProcId(); + assertEquals(ProcedureStoreTracker.DeleteState.NO, wal.getTracker().isDeleted(procId)); + assertEquals(j == i, wal.getTracker().isUpdated(procId)); + } + // check if the entries above the current proc are deleted/not updated + for (int j = i + 1; j < procs.length; ++j) { + final long procId = procs[j].getProcId(); + assertEquals(ProcedureStoreTracker.DeleteState.YES, wal.getTracker().isDeleted(procId)); + assertEquals(false, wal.getTracker().isUpdated(procId)); + } + } + } + + @Test public void testCorruptedEntries() throws Exception { // Insert something for (int i = 0; i < 100; ++i) {