Index: src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (revision 1000684) +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (working copy) @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.EOFException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Constructor; @@ -1790,83 +1791,85 @@ LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); try { - return replayRecoveredEdits(reader, minSeqId, reporter); - } finally { - reader.close(); - } - } - - /* @param reader Reader against file of recovered edits. - * @param minSeqId Any edit found in split editlogs needs to be in excess of - * this minSeqId to be applied, else its skipped. - * @param reporter - * @return the sequence id of the last edit added to this region out of the - * recovered edits log or minSeqId if nothing added from editlogs. - * @throws IOException - */ - private long replayRecoveredEdits(final HLog.Reader reader, - final long minSeqId, final Progressable reporter) - throws IOException { long currentEditSeqId = minSeqId; long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; HLog.Entry entry; Store store = null; - // How many edits to apply before we send a progress report. - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); - while ((entry = reader.next()) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); - if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); - } - // Now, figure if we should skip this edit. - if (key.getLogSeqNum() <= currentEditSeqId) { - skippedEdits++; - continue; - } - currentEditSeqId = key.getLogSeqNum(); - boolean flush = false; - for (KeyValue kv: val.getKeyValues()) { - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { + + try { + // How many edits to apply before we send a progress report. + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + while ((entry = reader.next()) != null) { + HLogKey key = entry.getKey(); + WALEdit val = entry.getEdit(); + if (firstSeqIdInLog == -1) { + firstSeqIdInLog = key.getLogSeqNum(); + } + // Now, figure if we should skip this edit. + if (key.getLogSeqNum() <= currentEditSeqId) { skippedEdits++; continue; } - // Figure which store the edit is meant for. - if (store == null || !kv.matchingFamily(store.getFamily().getName())) { - store = this.stores.get(kv.getFamily()); + currentEditSeqId = key.getLogSeqNum(); + boolean flush = false; + for (KeyValue kv: val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { + skippedEdits++; + continue; + } + // Figure which store the edit is meant for. + if (store == null || !kv.matchingFamily(store.getFamily().getName())) { + store = this.stores.get(kv.getFamily()); + } + if (store == null) { + // This should never happen. Perhaps schema was changed between + // crash and redeploy? + LOG.warn("No family for " + kv); + skippedEdits++; + continue; + } + // Once we are over the limit, restoreEdit will keep returning true to + // flush -- but don't flush until we've played all the kvs that make up + // the WALEdit. + flush = restoreEdit(store, kv); + editsCount++; } - if (store == null) { - // This should never happen. Perhaps schema was changed between - // crash and redeploy? - LOG.warn("No family for " + kv); - skippedEdits++; - continue; + if (flush) internalFlushcache(null, currentEditSeqId); + + // Every 'interval' edits, tell the reporter we're making progress. + // Have seen 60k edits taking 3minutes to complete. + if (reporter != null && (editsCount % interval) == 0) { + reporter.progress(); } - // Once we are over the limit, restoreEdit will keep returning true to - // flush -- but don't flush until we've played all the kvs that make up - // the WALEdit. - flush = restoreEdit(store, kv); - editsCount++; - } - if (flush) internalFlushcache(null, currentEditSeqId); - - // Every 'interval' edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % interval) == 0) { - reporter.progress(); } + } catch (EOFException eof) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + LOG.warn("Encountered EOF. Most likely due to Master failure during " + + "log spliting, so we have this data in another edit. " + + "Continuing, but renaming " + edits + " as " + p, eof); + } catch (IOException ioe) { + if (ioe.getMessage().startsWith("File is corrupt")) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + LOG.warn("File corruption encountered! " + + "Continuing, but renaming " + edits + " as " + p, ioe); + } else { + throw ioe; + } } if (LOG.isDebugEnabled()) { LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + - ", firstSequenceidInLog=" + firstSeqIdInLog + - ", maxSequenceidInLog=" + currentEditSeqId); + ", firstSequenceidInLog=" + firstSeqIdInLog + + ", maxSequenceidInLog=" + currentEditSeqId); } return currentEditSeqId; + } finally { + reader.close(); + } } /**