From 7d83e3a1986008f70ec1f5675c26d613ed264578 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=B0=8F=E4=BF=9D?= Date: Thu, 20 Dec 2018 17:18:45 +0800 Subject: [PATCH] compaction progress error --- .../hadoop/hbase/mob/DefaultMobStoreCompactor.java | 22 +++++++--- .../compactions/CompactionProgress.java | 8 ++++ .../hbase/regionserver/compactions/Compactor.java | 50 ++++++++++++++++++---- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 01c195a4d3..db5b43e7b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; @@ -161,12 +162,13 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { * @param throughputController The compaction throughput controller. * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param compactionProgress currentProgress of the thread * @return Whether compaction ended; false if it was interrupted for any reason. */ @Override protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact, CompactionProgress compactionProgress) throws IOException { long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; long bytesWrittenProgressForShippedCall = 0; @@ -282,8 +284,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { cellsSizeCompactedToMob += c.getValueLength(); } int len = KeyValueUtil.length(c); - ++progress.currentCompactedKVs; - progress.totalCompactedSize += len; + ++compactionProgress.currentCompactedKVs; + compactionProgress.totalCompactedSize += len; bytesWrittenProgressForShippedCall += len; if (LOG.isDebugEnabled()) { bytesWrittenProgressForLog += len; @@ -295,7 +297,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { bytesWrittenProgressForCloseCheck = 0; if (!store.areWritesEnabled()) { - progress.cancel(); + compactionProgress.cancel(); return false; } } @@ -313,7 +315,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { LOG.debug("Compaction progress: " + compactionName + " " - + progress + + compactionProgress + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)) + ", throughputController is " + throughputController); @@ -325,7 +327,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { } while (hasMore); finished = true; } catch (InterruptedException e) { - progress.cancel(); + compactionProgress.cancel(); + synchronized (progresses){ + progresses.remove(compactionProgress); + } throw new InterruptedIOException( "Interrupted while control throughput of compacting " + compactionName); } finally { @@ -364,7 +369,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); - progress.complete(); + compactionProgress.complete(); + synchronized (progresses){ + progresses.remove(compactionProgress); + } return true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java index 577276e85e..fad6409d32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionProgress.java @@ -91,6 +91,14 @@ public class CompactionProgress { return currentCompactedKVs; } + /** + * set totalCompactingKvs in currently running compaction + * @param totalCompactingKVs + */ + public void setTotalCompactingKVs(long totalCompactingKVs) { + this.totalCompactingKVs = totalCompactingKVs; + } + /** * @return the total data size processed by the currently running compaction, in bytes */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 83690a95c7..9e847099e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -72,6 +72,7 @@ public abstract class Compactor { private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; protected volatile CompactionProgress progress; + protected final List progresses = new ArrayList<>(); protected final Configuration conf; protected final HStore store; @@ -112,6 +113,21 @@ public abstract class Compactor { } public CompactionProgress getProgress() { + synchronized (progresses){ + if(progresses.size() > 0){ + progress = new CompactionProgress(0); + }else{ + if(progress != null){ + progress.complete(); + } + } + for (CompactionProgress p: + progresses) { + progress.setTotalCompactingKVs(p.getTotalCompactingKVs() + progress.getTotalCompactingKVs()); + progress.currentCompactedKVs += p.getCurrentCompactedKvs(); + progress.totalCompactedSize += p.getTotalCompactedSize(); + } + } return this.progress; } @@ -294,7 +310,13 @@ public abstract class Compactor { InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); - this.progress = new CompactionProgress(fd.maxKeyCount); + CompactionProgress pro = new CompactionProgress(fd.maxKeyCount); + synchronized (progresses){ + this.progresses.add(pro); + if(this.progress == null){ + this.progress = pro; + } + } // Find the smallest read point across all the Scanners. long smallestReadPoint = getSmallestReadPoint(); @@ -325,7 +347,7 @@ public abstract class Compactor { } writer = sinkFactory.createWriter(scanner, fd, dropCache); finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size()); + throughputController, request.isAllFiles(), request.getFiles().size(),pro); if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -356,11 +378,12 @@ public abstract class Compactor { * smallestReadPoint * @param major Is a major compaction. * @param numofFilesToCompact the number of files to compact + * @param compactionProgress currentProgress of the thread * @return Whether compaction ended; false if it was interrupted for some reason. */ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { + boolean major, int numofFilesToCompact,CompactionProgress compactionProgress) throws IOException { assert writer instanceof ShipperListener; long bytesWrittenProgressForCloseCheck = 0; long bytesWrittenProgressForLog = 0; @@ -402,8 +425,8 @@ public abstract class Compactor { } writer.append(c); int len = KeyValueUtil.length(c); - ++progress.currentCompactedKVs; - progress.totalCompactedSize += len; + ++compactionProgress.currentCompactedKVs; + compactionProgress.totalCompactedSize += len; bytesWrittenProgressForShippedCall += len; if (LOG.isDebugEnabled()) { bytesWrittenProgressForLog += len; @@ -415,7 +438,10 @@ public abstract class Compactor { if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { bytesWrittenProgressForCloseCheck = 0; if (!store.areWritesEnabled()) { - progress.cancel(); + compactionProgress.cancel(); + synchronized (progresses){ + this.progresses.remove(compactionProgress); + } return false; } } @@ -451,7 +477,7 @@ public abstract class Compactor { LOG.debug("Compaction progress: " + compactionName + " " - + progress + + compactionProgress + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0)) + ", throughputController is " + throughputController); @@ -462,13 +488,19 @@ public abstract class Compactor { cells.clear(); } while (hasMore); } catch (InterruptedException e) { - progress.cancel(); + compactionProgress.cancel(); + synchronized (progresses){ + this.progresses.remove(compactionProgress); + } throw new InterruptedIOException("Interrupted while control throughput of compacting " + compactionName); } finally { throughputController.finish(compactionName); } - progress.complete(); + compactionProgress.complete(); + synchronized (progresses){ + this.progresses.remove(compactionProgress); + } return true; } -- 2.14.1