diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 18ea7d7..a812484 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -29,8 +29,8 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -52,6 +52,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -188,7 +191,7 @@ public class WALSplitter { final FileStatus[] logfiles = SplitLogManager.getFileList(conf, Collections.singletonList(logDir), null); List splits = new ArrayList<>(); - if (logfiles != null && logfiles.length > 0) { + if (ArrayUtils.isNotEmpty(logfiles)) { for (FileStatus logfile: logfiles) { WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); if (s.splitLogFile(logfile, null)) { @@ -230,7 +233,7 @@ public class WALSplitter { this.fileBeingSplit = logfile; try { long logLength = logfile.getLen(); - LOG.info("Splitting WAL=" + logPath + ", length=" + logLength); + LOG.info("Splitting WAL={}, length={}", logPath, logLength); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { progress_failed = true; @@ -238,7 +241,7 @@ public class WALSplitter { } logFileReader = getReader(logfile, skipErrors, reporter); if (logFileReader == null) { - LOG.warn("Nothing to split in WAL=" + logPath); + LOG.warn("Nothing to split in WAL={}", logPath); return true; } int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); @@ -302,7 +305,7 @@ public class WALSplitter { iie.initCause(ie); throw iie; } catch (CorruptedLogFileException e) { - LOG.warn("Could not parse, corrupted WAL=" + logPath, e); + LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs); @@ -315,14 +318,13 @@ public class WALSplitter { e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; throw e; } finally { - LOG.debug("Finishing writing output logs and closing down."); + LOG.debug("Finishing writing output logs and closing down"); try { if (null != logFileReader) { logFileReader.close(); } } catch (IOException exception) { - LOG.warn("Could not close WAL reader: " + exception.getMessage()); - LOG.debug("exception details", exception); + LOG.warn("Could not close WAL reader", exception); } try { if (outputSinkStarted) { @@ -402,11 +404,11 @@ public class WALSplitter { final FileSystem fs, final Configuration conf) throws IOException { final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { - LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to " - + corruptDir); + LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", + corruptDir); } if (!fs.mkdirs(corruptDir)) { - LOG.info("Unable to mkdir " + corruptDir); + LOG.info("Unable to mkdir {}", corruptDir); } fs.mkdirs(oldLogDir); @@ -416,9 +418,9 @@ public class WALSplitter { Path p = new Path(corruptDir, corrupted.getName()); if (fs.exists(corrupted)) { if (!fs.rename(corrupted, p)) { - LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); + LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); } else { - LOG.warn("Moved corrupted log " + corrupted + " to " + p); + LOG.warn("Moved corrupted log {} to {}", corrupted, p); } } } @@ -427,9 +429,9 @@ public class WALSplitter { Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); if (fs.exists(p)) { if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { - LOG.warn("Unable to move " + p + " to " + newPath); + LOG.warn("Unable to move {} to {}", p, newPath); } else { - LOG.info("Archived processed log " + p + " to " + newPath); + LOG.info("Archived processed log {} to {}", p , newPath); } } } @@ -459,9 +461,9 @@ public class WALSplitter { Path dir = getRegionDirRecoveredEditsDir(regiondir); if (!fs.exists(regiondir)) { - LOG.info("This region's directory doesn't exist: " - + regiondir.toString() + ". It is very likely that it was" + - " already split so it's safe to discard those edits."); + LOG.info("This region's directory doesn't exist: {}." + + "It is very likely that it was already split so it is " + + "safe to discard those edits.", regiondir); return null; } if (fs.exists(dir) && fs.isFile(dir)) { @@ -471,16 +473,16 @@ public class WALSplitter { } tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); - LOG.warn("Found existing old file: " + dir + ". It could be some " + LOG.warn("Found existing old file: {}. It could be some " + "leftover of an old installation. It should be a folder instead. " - + "So moving it to " + tmp); + + "So moving it to {}", dir, tmp); if (!fs.rename(dir, tmp)) { - LOG.warn("Failed to sideline old file " + dir); + LOG.warn("Failed to sideline old file {}", dir); } } if (!fs.exists(dir) && !fs.mkdirs(dir)) { - LOG.warn("mkdir failed on " + dir); + LOG.warn("mkdir failed on {}", dir); } // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure @@ -539,8 +541,9 @@ public class WALSplitter { final Path regiondir) throws IOException { NavigableSet filesSorted = new TreeSet<>(); Path editsdir = getRegionDirRecoveredEditsDir(regiondir); - if (!fs.exists(editsdir)) + if (!fs.exists(editsdir)) { return filesSorted; + } FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { @Override public boolean accept(Path p) { @@ -562,16 +565,13 @@ public class WALSplitter { result = false; } } catch (IOException e) { - LOG.warn("Failed isFile check on " + p); + LOG.warn("Failed isFile check on {}", p); } return result; } }); - if (files == null) { - return filesSorted; - } - for (FileStatus status : files) { - filesSorted.add(status.getPath()); + if (ArrayUtils.isNotEmpty(files)) { + Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); } return filesSorted; } @@ -590,7 +590,7 @@ public class WALSplitter { Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); if (!fs.rename(edits, moveAsideName)) { - LOG.warn("Rename failed from " + edits + " to " + moveAsideName); + LOG.warn("Rename failed from {} to {}", edits, moveAsideName); } return moveAsideName; } @@ -640,7 +640,7 @@ public class WALSplitter { - SEQUENCE_ID_FILE_SUFFIX_LENGTH)); maxSeqId = Math.max(tmpSeqId, maxSeqId); } catch (NumberFormatException ex) { - LOG.warn("Invalid SeqId File Name=" + fileName); + LOG.warn("Invalid SeqId File Name={}", fileName); } } } @@ -657,10 +657,8 @@ public class WALSplitter { if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { throw new IOException("Failed to create SeqId file:" + newSeqIdFile); } - if (LOG.isDebugEnabled()) { - LOG.debug("Wrote file=" + newSeqIdFile + ", newSeqId=" + newSeqId - + ", maxSeqId=" + maxSeqId); - } + LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile, + newSeqId, maxSeqId); } catch (FileAlreadyExistsException ignored) { // latest hdfs throws this exception. it's all right if newSeqIdFile already exists } @@ -668,10 +666,9 @@ public class WALSplitter { // remove old ones if (files != null) { for (FileStatus status : files) { - if (newSeqIdFile.equals(status.getPath())) { - continue; + if (!newSeqIdFile.equals(status.getPath())) { + fs.delete(status.getPath(), false); } - fs.delete(status.getPath(), false); } } return newSeqId; @@ -695,7 +692,7 @@ public class WALSplitter { // zero length even if the file has been sync'd. Revisit if HDFS-376 or // HDFS-878 is committed. if (length <= 0) { - LOG.warn("File " + path + " might be still open, length is 0"); + LOG.warn("File {} might be still open, length is 0", path); } try { @@ -709,17 +706,15 @@ public class WALSplitter { // ignore if this is the last log in sequence. // TODO is this scenario still possible if the log has been // recovered (i.e. closed) - LOG.warn("Could not open " + path + " for reading. File is empty", e); - return null; - } else { - // EOFException being ignored - return null; + LOG.warn("Could not open {} for reading. File is empty", path, e); } + // EOFException being ignored + return null; } } catch (IOException e) { if (e instanceof FileNotFoundException) { // A wal file may not exist anymore. Nothing can be recovered so move on - LOG.warn("File " + path + " doesn't exist anymore.", e); + LOG.warn("File {} does not exist anymore", path, e); return null; } if (!skipErrors || e instanceof InterruptedIOException) { @@ -740,7 +735,7 @@ public class WALSplitter { return in.next(); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) - LOG.info("EOF from wal " + path + ". continuing"); + LOG.info("EOF from wal {}. Continuing.", path); return null; } catch (IOException e) { // If the IOE resulted from bad file format, @@ -748,8 +743,7 @@ public class WALSplitter { if (e.getCause() != null && (e.getCause() instanceof ParseException || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { - LOG.warn("Parse exception " + e.getCause().toString() + " from wal " - + path + ". continuing"); + LOG.warn("Parse exception from wal {}. Continuing", path, e); return null; } if (!skipErrors) { @@ -871,8 +865,7 @@ public class WALSplitter { synchronized (controller.dataAvailable) { totalBuffered += incrHeap; while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { - LOG.debug("Used " + totalBuffered + - " bytes of buffered edits, waiting for IO threads..."); + LOG.debug("Used {} bytes of buffered edits, waiting for IO threads.", totalBuffered); controller.dataAvailable.wait(2000); } controller.dataAvailable.notifyAll(); @@ -951,7 +944,7 @@ public class WALSplitter { RegionEntryBuffer(TableName tableName, byte[] region) { this.tableName = tableName; this.encodedRegionName = region; - this.entryBuffer = new LinkedList<>(); + this.entryBuffer = new ArrayList<>(); } long appendEntry(Entry entry) { @@ -1012,7 +1005,7 @@ public class WALSplitter { } private void doRun() throws IOException { - if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting"); + LOG.trace("Writer thread starting"); while (true) { RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { @@ -1163,7 +1156,7 @@ public class WALSplitter { } } controller.checkForErrors(); - LOG.info(this.writerThreads.size() + " split writers finished; closing..."); + LOG.info("{} split writers finished; closing...", this.writerThreads.size()); return (!progress_failed); } @@ -1230,7 +1223,7 @@ public class WALSplitter { } finally { result = close(); List thrown = closeLogWriters(null); - if (thrown != null && !thrown.isEmpty()) { + if (CollectionUtils.isNotEmpty(thrown)) { throw MultipleIOException.createIOException(thrown); } } @@ -1249,25 +1242,22 @@ public class WALSplitter { dstMinLogSeqNum = entry.getKey().getSequenceId(); } } catch (EOFException e) { - if (LOG.isDebugEnabled()) { - LOG.debug( - "Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?", - e); - } + LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", + dst, e); } if (wap.minLogSeqNum < dstMinLogSeqNum) { LOG.warn("Found existing old edits file. It could be the result of a previous failed" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + fs.getFileStatus(dst).getLen()); if (!fs.delete(dst, false)) { - LOG.warn("Failed deleting of old " + dst); + LOG.warn("Failed deleting of old {}", dst); throw new IOException("Failed deleting of old " + dst); } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p + ", length=" + fs.getFileStatus(wap.p).getLen()); if (!fs.delete(wap.p, false)) { - LOG.warn("Failed deleting of " + wap.p); + LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } } @@ -1301,11 +1291,11 @@ public class WALSplitter { @Override public Void call() throws Exception { WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); + LOG.trace("Closing {}", wap.p); try { wap.w.close(); } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); + LOG.error("Couldn't close log at {}", wap.p, ioe); thrown.add(ioe); return null; } @@ -1317,8 +1307,8 @@ public class WALSplitter { if (wap.editsWritten == 0) { // just remove the empty recovered.edits file if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { - LOG.warn("Failed deleting empty " + wap.p); - throw new IOException("Failed deleting empty " + wap.p); + LOG.warn("Failed deleting empty {}", wap.p); + throw new IOException("Failed deleting empty " + wap.p); } return null; } @@ -1336,10 +1326,10 @@ public class WALSplitter { if (!fs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } - LOG.info("Rename " + wap.p + " to " + dst); + LOG.info("Rename {} to {}", wap.p, dst); } } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + LOG.error("Could not rename {} to {}", wap.p, dst, ioe); thrown.add(ioe); return null; } @@ -1409,7 +1399,7 @@ public class WALSplitter { wap = (WriterAndPath) tmpWAP; wap.w.close(); } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); + LOG.error("Couldn't close log at {}", wap.p, ioe); thrown.add(ioe); continue; } @@ -1461,18 +1451,18 @@ public class WALSplitter { + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + fs.getFileStatus(regionedits).getLen()); if (!fs.delete(regionedits, false)) { - LOG.warn("Failed delete of old " + regionedits); + LOG.warn("Failed delete of old {}", regionedits); } } Writer w = createWriter(regionedits); - LOG.debug("Creating writer path=" + regionedits); + LOG.debug("Creating writer path={}", regionedits); return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); } private void filterCellByStore(Entry logEntry) { Map maxSeqIdInStores = regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); - if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { + if (MapUtils.isEmpty(maxSeqIdInStores)) { return; } // Create the array list for the cells that aren't filtered. @@ -1516,11 +1506,9 @@ public class WALSplitter { if (wap == null) { wap = getWriterAndPath(logEntry); if (wap == null) { - if (LOG.isTraceEnabled()) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry); - } + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); return; } } @@ -1539,16 +1527,16 @@ public class WALSplitter { } catch (IOException e) { e = e instanceof RemoteException ? ((RemoteException)e).unwrapRemoteException() : e; - LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e); + LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); throw e; } } @Override public boolean keepRegionEvent(Entry entry) { - ArrayList cells = entry.getEdit().getCells(); - for (int i = 0; i < cells.size(); i++) { - if (WALEdit.isCompactionMarker(cells.get(i))) { + List cells = entry.getEdit().getCells(); + for (Cell cell : cells) { + if (WALEdit.isCompactionMarker(cell)) { return true; } } @@ -1676,8 +1664,7 @@ public class WALSplitter { throws IOException { if (entry == null) { - // return an empty array - return new ArrayList<>(); + return Collections.emptyList(); } long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? @@ -1688,7 +1675,9 @@ public class WALSplitter { Mutation m = null; WALKeyImpl key = null; WALEdit val = null; - if (logEntry != null) val = new WALEdit(); + if (logEntry != null) { + val = new WALEdit(); + } for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off