diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 5150194..c68d3bb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -1206,8 +1206,8 @@ public void majorCompactRegion(final byte[] regionName, final byte[] columnFamil * @param tableName table or region to compact * @param columnFamily column family within a table or region * @param major True if we are to do a major compaction. + * @param compactType {@link org.apache.hadoop.hbase.client.CompactType} * @throws IOException if a remote or network exception occurs - * @throws InterruptedException */ private void compact(final TableName tableName, final byte[] columnFamily,final boolean major, CompactType compactType) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 936a6fd..697286c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.mob; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -36,17 +37,21 @@ import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Compact passed set of files in the mob-enabled column family. @@ -164,12 +169,20 @@ public DefaultMobStoreCompactor(Configuration conf, Store store) { protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, boolean major, int numofFilesToCompact) throws IOException { - int bytesWritten = 0; + long bytesWrittenProgressForCloseCheck = 0; + long bytesWrittenProgressForLog = 0; + long bytesWrittenProgressForShippedCall = 0; // Since scanner.next() can return 'false' but still be delivering data, // we have to use a do/while loop. List cells = new ArrayList(); // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - int closeCheckInterval = HStore.getCloseCheckInterval(); + int closeCheckSizeLimit = HStore.getCloseCheckInterval(); + long lastMillis = 0; + if (LOG.isDebugEnabled()) { + lastMillis = EnvironmentEdgeManager.currentTime(); + } + String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); + long now = 0; boolean hasMore; Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); byte[] fileName = null; @@ -177,25 +190,41 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel long mobCells = 0, deleteMarkersCount = 0; long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; + boolean finished = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + throughputController.start(compactionName); + KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; + long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getFamily().getBlocksize(); try { try { // If the mob file writer could not be created, directly write the cell to the store file. mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, - store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + compactionCompression, store.getRegionInfo().getStartKey(), true); fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); } catch (IOException e) { - LOG.error("Failed to create mob writer, " + LOG.warn("Failed to create mob writer, " + "we will continue the compaction by writing MOB cells directly in store files", e); } - delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, - store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + if (major) { + try { + delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), + fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); + } catch (IOException e) { + LOG.warn( + "Failed to create del writer, " + + "we will continue the compaction by writing delete markers directly in store files", + e); + } + } do { hasMore = scanner.next(cells, scannerContext); + if (LOG.isDebugEnabled()) { + now = EnvironmentEdgeManager.currentTime(); + } for (Cell c : cells) { if (major && CellUtil.isDelete(c)) { - if (MobUtils.isMobReferenceCell(c)) { + if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) { // Directly write it to a store file writer.append(c); } else { @@ -254,56 +283,83 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel cellsCountCompactedToMob++; cellsSizeCompactedToMob += c.getValueLength(); } + int len = KeyValueUtil.length(c); ++progress.currentCompactedKVs; + progress.totalCompactedSize += len; + bytesWrittenProgressForShippedCall += len; + if (LOG.isDebugEnabled()) { + bytesWrittenProgressForLog += len; + } + throughputController.control(compactionName, len); // check periodically to see if a system stop is requested - if (closeCheckInterval > 0) { - bytesWritten += KeyValueUtil.length(c); - if (bytesWritten > closeCheckInterval) { - bytesWritten = 0; + if (closeCheckSizeLimit > 0) { + bytesWrittenProgressForCloseCheck += len; + if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { + bytesWrittenProgressForCloseCheck = 0; if (!store.areWritesEnabled()) { progress.cancel(); return false; } } } + if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + ((ShipperListener)writer).beforeShipped(); + kvs.shipped(); + bytesWrittenProgressForShippedCall = 0; + } + } + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { + LOG.debug("Compaction progress: " + + compactionName + + " " + + progress + + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) + / ((now - lastMillis) / 1000.0)) + ", throughputController is " + + throughputController); + lastMillis = now; + bytesWrittenProgressForLog = 0; + } } cells.clear(); } while (hasMore); + finished = true; + } catch (InterruptedException e) { + progress.cancel(); + throw new InterruptedIOException( + "Interrupted while control throughput of compacting " + compactionName); } finally { - if (mobFileWriter != null) { - mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); - mobFileWriter.close(); + throughputController.finish(compactionName); + if (!finished && mobFileWriter != null) { + abortWriter(mobFileWriter); } - if (delFileWriter != null) { + if (!finished && delFileWriter != null) { + abortWriter(delFileWriter); + } + } + if (delFileWriter != null) { + if (deleteMarkersCount > 0) { + // If the del file is not empty, commit it. + // If the commit fails, the compaction is re-performed again. delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); delFileWriter.close(); + mobStore.commitFile(delFileWriter.getPath(), path); + } else { + // If the del file is empty, delete it instead of committing. + abortWriter(delFileWriter); } } if (mobFileWriter != null) { if (mobCells > 0) { // If the mob file is not empty, commit it. + mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); + mobFileWriter.close(); mobStore.commitFile(mobFileWriter.getPath(), path); } else { - try { - // If the mob file is empty, delete it instead of committing. - store.getFileSystem().delete(mobFileWriter.getPath(), true); - } catch (IOException e) { - LOG.error("Failed to delete the temp mob file", e); - } - } - } - if (delFileWriter != null) { - if (deleteMarkersCount > 0) { - // If the del file is not empty, commit it. - // If the commit fails, the compaction is re-performed again. - mobStore.commitFile(delFileWriter.getPath(), path); - } else { - try { - // If the del file is empty, delete it instead of committing. - store.getFileSystem().delete(delFileWriter.getPath(), true); - } catch (IOException e) { - LOG.error("Failed to delete the temp del file", e); - } + // If the mob file is empty, delete it instead of committing. + abortWriter(mobFileWriter); } } mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 77f167e..3c6a071 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mob; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; @@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; +import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -112,14 +115,23 @@ public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOExceptio synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(), - false, true, true, false/*default for dropbehind*/, snapshot.getTimeRangeTracker()); + writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompressionType(), + false, true, true, false, snapshot.getTimeRangeTracker()); + IOException e = null; try { // It's a mob store, flush the cells in a mob way. This is the difference of flushing // between a normal and a mob store. - performMobFlush(snapshot, cacheFlushId, scanner, writer, status); + performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); + } catch (IOException ioe) { + e = ioe; + // throw the exception out + throw ioe; } finally { - finalizeWriter(writer, cacheFlushId, status); + if (e != null) { + writer.close(); + } else { + finalizeWriter(writer, cacheFlushId, status); + } } } } finally { @@ -148,10 +160,12 @@ public DefaultMobStoreFlusher(Configuration conf, Store store) throws IOExceptio * @param scanner The scanner of memstore snapshot. * @param writer The store file writer. * @param status Task that represents the flush operation and may be updated with status. + * @param throughputController A controller to avoid flush too fast. * @throws IOException */ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, - InternalScanner scanner, StoreFileWriter writer, MonitoredTask status) throws IOException { + InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, + ThroughputController throughputController) throws IOException { StoreFileWriter mobFileWriter = null; int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT); @@ -159,16 +173,21 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, long mobSize = 0; long time = snapshot.getTimeRangeTracker().getMax(); mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), - store.getFamily().getCompression(), store.getRegionInfo().getStartKey()); + store.getFamily().getCompressionType(), store.getRegionInfo().getStartKey(), false); // the target path is {tableName}/.mob/{cfName}/mobFiles // the relative path is mobFiles byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + List cells = new ArrayList(); + boolean hasMore; + String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); + boolean control = throughputController != null && !store.getRegionInfo().isSystemTable(); + if (control) { + throughputController.start(flushName); + } + IOException ioe = null; try { - List cells = new ArrayList(); - boolean hasMore; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - do { hasMore = scanner.next(cells, scannerContext); if (!cells.isEmpty()) { @@ -191,15 +210,28 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, this.mobStore.getRefCellTags()); writer.append(reference); } + int len = KeyValueUtil.length(c); + if (control) { + throughputController.control(flushName, len); + } } cells.clear(); } } while (hasMore); + } catch (InterruptedException e) { + ioe = new InterruptedIOException( + "Interrupted while control throughput of flushing " + flushName); + throw ioe; + } catch (IOException e) { + ioe = e; + throw e; } finally { - status.setStatus("Flushing mob file " + store + ": appending metadata"); - mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); - status.setStatus("Flushing mob file " + store + ": closing flushed file"); - mobFileWriter.close(); + if (control) { + throughputController.finish(flushName); + } + if (ioe != null) { + mobFileWriter.close(); + } } if (mobCount > 0) { @@ -207,12 +239,18 @@ protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, // If the mob file is committed successfully but the store file is not, // the committed mob file will be handled by the sweep tool as an unused // file. + status.setStatus("Flushing mob file " + store + ": appending metadata"); + mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); + status.setStatus("Flushing mob file " + store + ": closing flushed file"); + mobFileWriter.close(); mobStore.commitFile(mobFileWriter.getPath(), targetPath); mobStore.updateMobFlushCount(); mobStore.updateMobFlushedCellsCount(mobCount); mobStore.updateMobFlushedCellsSize(mobSize); } else { try { + status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file"); + mobFileWriter.close(); // If the mob file is empty, delete it instead of committing. store.getFileSystem().delete(mobFileWriter.getPath(), true); } catch (IOException e) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java index 7e46291..8191828 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobUtils.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.crypto.Encryption; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.master.locking.LockManager; @@ -71,6 +72,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -489,8 +491,6 @@ public static Cell createMobRefCell(Cell cell, byte[] fileName, Tag tableNameTag // find the original mob files by this table name. For details please see cloning // snapshot for mob files. tags.add(tableNameTag); - // Add the existing tags. - TagUtil.carryForwardTags(tags, cell); return createMobRefCell(cell, fileName, TagUtil.fromList(tags)); } @@ -511,18 +511,19 @@ public static Cell createMobRefCell(Cell cell, byte[] fileName, byte[] refCellTa * @param startKey The hex string of the start key. * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, HColumnDescriptor family, String date, Path basePath, long maxKeyCount, Compression.Algorithm compression, String startKey, CacheConfig cacheConfig, - Encryption.Context cryptoContext) + Encryption.Context cryptoContext, boolean isCompaction) throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", "")); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, - cacheConfig, cryptoContext); + cacheConfig, cryptoContext, isCompaction); } /** @@ -534,25 +535,19 @@ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, * @param maxKeyCount The key count. * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem fs, HColumnDescriptor family, Path basePath, long maxKeyCount, CacheConfig cacheConfig, - Encryption.Context cryptoContext) + Encryption.Context cryptoContext, boolean isCompaction) throws IOException { - HFileContext hFileContext = new HFileContextBuilder().withIncludesMvcc(true) - .withIncludesTags(true).withCompression(family.getCompactionCompression()) - .withCompressTags(family.isCompressTags()).withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize()) - .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) - .withEncryptionContext(cryptoContext).withCreateTime(EnvironmentEdgeManager.currentTime()) - .build(); - Path tempPath = new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")); - StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConfig, fs).withFilePath(tempPath) - .withComparator(CellComparator.COMPARATOR).withBloomType(family.getBloomFilterType()) - .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); - return w; + return createWriter(conf, fs, family, + new Path(basePath, UUID.randomUUID().toString().replaceAll("-", "")), maxKeyCount, + family.getCompactionCompressionType(), cacheConfig, cryptoContext, + HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), family.getBlocksize(), + family.getBloomFilterType(), isCompaction); } /** @@ -567,18 +562,19 @@ public static StoreFileWriter createRefFileWriter(Configuration conf, FileSystem * @param startKey The start key. * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, HColumnDescriptor family, String date, Path basePath, long maxKeyCount, Compression.Algorithm compression, byte[] startKey, CacheConfig cacheConfig, - Encryption.Context cryptoContext) + Encryption.Context cryptoContext, boolean isCompaction) throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID().toString().replaceAll("-", "")); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, - cacheConfig, cryptoContext); + cacheConfig, cryptoContext, isCompaction); } /** @@ -605,7 +601,7 @@ public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem .randomUUID().toString().replaceAll("-", "") + "_del"; MobFileName mobFileName = MobFileName.create(startKey, date, suffix); return createWriter(conf, fs, family, mobFileName, basePath, maxKeyCount, compression, - cacheConfig, cryptoContext); + cacheConfig, cryptoContext, true); } /** @@ -619,26 +615,69 @@ public static StoreFileWriter createDelFileWriter(Configuration conf, FileSystem * @param compression The compression algorithm. * @param cacheConfig The current cache config. * @param cryptoContext The encryption context. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ - private static StoreFileWriter createWriter(Configuration conf, FileSystem fs, - HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, - Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext) - throws IOException { + public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, MobFileName mobFileName, Path basePath, long maxKeyCount, + Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, + boolean isCompaction) + throws IOException { + return createWriter(conf, fs, family, + new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, cacheConfig, + cryptoContext, HStore.getChecksumType(conf), HStore.getBytesPerChecksum(conf), + family.getBlocksize(), BloomType.NONE, isCompaction); + } + + /** + * Creates a writer for the mob file in temp directory. + * @param conf The current configuration. + * @param fs The current file system. + * @param family The descriptor of the current column family. + * @param path The path for a temp directory. + * @param maxKeyCount The key count. + * @param compression The compression algorithm. + * @param cacheConfig The current cache config. + * @param cryptoContext The encryption context. + * @param checksumType The checksum type. + * @param bytesPerChecksum The bytes per checksum. + * @param blocksize The HFile block size. + * @param bloomType The bloom filter type. + * @param isCompaction If the writer is used in compaction. + * @return The writer for the mob file. + * @throws IOException + */ + public static StoreFileWriter createWriter(Configuration conf, FileSystem fs, + HColumnDescriptor family, Path path, long maxKeyCount, + Compression.Algorithm compression, CacheConfig cacheConfig, Encryption.Context cryptoContext, + ChecksumType checksumType, int bytesPerChecksum, int blocksize, BloomType bloomType, + boolean isCompaction) + throws IOException { + if (compression == null) { + compression = HFile.DEFAULT_COMPRESSION_ALGORITHM; + } + final CacheConfig writerCacheConf; + if (isCompaction) { + writerCacheConf = new CacheConfig(cacheConfig); + writerCacheConf.setCacheDataOnWrite(false); + } else { + writerCacheConf = cacheConfig; + } HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withIncludesMvcc(true).withIncludesTags(true) - .withCompressTags(family.isCompressTags()) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(family.getBlocksize()) - .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) - .withEncryptionContext(cryptoContext) - .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); - - StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConfig, fs) - .withFilePath(new Path(basePath, mobFileName.getFileName())) - .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE) - .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); + .withIncludesMvcc(true).withIncludesTags(true) + .withCompressTags(family.isCompressTags()) + .withChecksumType(checksumType) + .withBytesPerCheckSum(bytesPerChecksum) + .withBlockSize(blocksize) + .withHBaseCheckSum(true).withDataBlockEncoding(family.getDataBlockEncoding()) + .withEncryptionContext(cryptoContext) + .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); + + StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, fs) + .withFilePath(path) + .withComparator(CellComparator.COMPARATOR).withBloomType(bloomType) + .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); return w; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java index 3292d99..b6cf814 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactionRequest.java @@ -26,7 +26,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index b6eb640..987fe51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -629,13 +629,14 @@ private void compactMobFilesInBatch(PartitionedMobCompactionRequest request, writer = MobUtils .createWriter(conf, fs, column, partition.getPartitionId().getLatestDate(), tempPath, Long.MAX_VALUE, column.getCompactionCompressionType(), - partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext); + partition.getPartitionId().getStartKey(), compactionCacheConfig, cryptoContext, + true); cleanupTmpMobFile = true; filePath = writer.getPath(); byte[] fileName = Bytes.toBytes(filePath.getName()); // create a temp file and open a writer for it in the bulkloadPath refFileWriter = MobUtils.createRefFileWriter(conf, fs, column, bulkloadColumnPath, - fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext); + fileInfo.getSecond().longValue(), compactionCacheConfig, cryptoContext, true); cleanupBulkloadDirOfPartition = true; List cells = new ArrayList<>(); boolean hasMore; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 7f7ecb4..a990ceb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -48,10 +48,7 @@ import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CorruptHFileException; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFile; @@ -59,7 +56,6 @@ import org.apache.hadoop.hbase.mob.MobStoreEngine; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HFileArchiveUtil; import org.apache.hadoop.hbase.util.IdLock; @@ -190,16 +186,19 @@ private Path getTempDir() { * @param maxKeyCount The key count. * @param compression The compression algorithm. * @param startKey The start key. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public StoreFileWriter createWriterInTmp(Date date, long maxKeyCount, - Compression.Algorithm compression, byte[] startKey) throws IOException { + Compression.Algorithm compression, byte[] startKey, + boolean isCompaction) throws IOException { if (startKey == null) { startKey = HConstants.EMPTY_START_ROW; } Path path = getTempDir(); - return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey); + return createWriterInTmp(MobUtils.formatDate(date), path, maxKeyCount, compression, startKey, + isCompaction); } /** @@ -222,7 +221,7 @@ public StoreFileWriter createDelFileWriterInTmp(Date date, long maxKeyCount, String suffix = UUID .randomUUID().toString().replaceAll("-", "") + "_del"; MobFileName mobFileName = MobFileName.create(startKey, MobUtils.formatDate(date), suffix); - return createWriterInTmp(mobFileName, path, maxKeyCount, compression); + return createWriterInTmp(mobFileName, path, maxKeyCount, compression, true); } /** @@ -232,14 +231,16 @@ public StoreFileWriter createDelFileWriterInTmp(Date date, long maxKeyCount, * @param maxKeyCount The key count. * @param compression The compression algorithm. * @param startKey The start key. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKeyCount, - Compression.Algorithm compression, byte[] startKey) throws IOException { + Compression.Algorithm compression, byte[] startKey, + boolean isCompaction) throws IOException { MobFileName mobFileName = MobFileName.create(startKey, date, UUID.randomUUID() .toString().replaceAll("-", "")); - return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression); + return createWriterInTmp(mobFileName, basePath, maxKeyCount, compression, isCompaction); } /** @@ -248,27 +249,16 @@ public StoreFileWriter createWriterInTmp(String date, Path basePath, long maxKey * @param basePath The basic path for a temp directory. * @param maxKeyCount The key count. * @param compression The compression algorithm. + * @param isCompaction If the writer is used in compaction. * @return The writer for the mob file. * @throws IOException */ public StoreFileWriter createWriterInTmp(MobFileName mobFileName, Path basePath, - long maxKeyCount, Compression.Algorithm compression) throws IOException { - final CacheConfig writerCacheConf = mobCacheConfig; - HFileContext hFileContext = new HFileContextBuilder().withCompression(compression) - .withIncludesMvcc(true).withIncludesTags(true) - .withCompressTags(family.isCompressTags()) - .withChecksumType(checksumType) - .withBytesPerCheckSum(bytesPerChecksum) - .withBlockSize(blocksize) - .withHBaseCheckSum(true).withDataBlockEncoding(getFamily().getDataBlockEncoding()) - .withEncryptionContext(cryptoContext) - .withCreateTime(EnvironmentEdgeManager.currentTime()).build(); - - StoreFileWriter w = new StoreFileWriter.Builder(conf, writerCacheConf, region.getFilesystem()) - .withFilePath(new Path(basePath, mobFileName.getFileName())) - .withComparator(CellComparator.COMPARATOR).withBloomType(BloomType.NONE) - .withMaxKeyCount(maxKeyCount).withFileContext(hFileContext).build(); - return w; + long maxKeyCount, Compression.Algorithm compression, + boolean isCompaction) throws IOException { + return MobUtils.createWriter(conf, region.getFilesystem(), family, + new Path(basePath, mobFileName.getFileName()), maxKeyCount, compression, mobCacheConfig, + cryptoContext, checksumType, bytesPerChecksum, blocksize, BloomType.NONE, isCompaction); } /** @@ -412,7 +402,7 @@ private Cell readCell(List locations, String fileName, Cell search, boolea throwable = e; if ((e instanceof FileNotFoundException) || (e.getCause() instanceof FileNotFoundException)) { - LOG.warn("Fail to read the cell, the mob file " + path + " doesn't exist", e); + LOG.debug("Fail to read the cell, the mob file " + path + " doesn't exist", e); } else if (e instanceof CorruptHFileException) { LOG.error("The mob file " + path + " is corrupt", e); break; @@ -421,11 +411,11 @@ private Cell readCell(List locations, String fileName, Cell search, boolea } } catch (NullPointerException e) { // HDFS 1.x - DFSInputStream.getBlockAt() mobCacheConfig.getMobFileCache().evictFile(fileName); - LOG.warn("Fail to read the cell", e); + LOG.debug("Fail to read the cell", e); throwable = e; } catch (AssertionError e) { // assert in HDFS 1.x - DFSInputStream.getBlockAt() mobCacheConfig.getMobFileCache().evictFile(fileName); - LOG.warn("Fail to read the cell", e); + LOG.debug("Fail to read the cell", e); throwable = e; } finally { if (file != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 84253c8..4d2fea8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -173,7 +173,7 @@ protected int bytesPerChecksum; // Comparing KeyValues - private final CellComparator comparator; + protected final CellComparator comparator; final StoreEngine storeEngine; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index e1c08bd..7f7c045 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -68,7 +68,7 @@ @InterfaceAudience.Private public abstract class Compactor { private static final Log LOG = LogFactory.getLog(Compactor.class); - private static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; + protected static final long COMPACTION_PROGRESS_LOG_INTERVAL = 60 * 1000; protected volatile CompactionProgress progress; protected final Configuration conf; protected final Store store; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index aaee994..724761a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2286,7 +2286,6 @@ public int countRows(final Region region, final Scan scan) throws IOException { } public int countRows(final InternalScanner scanner) throws IOException { - // Do not retrieve the mob data when scanning int scannedCount = 0; List results = new ArrayList(); boolean hasMore = true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java index 9abe040..d428986 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobFileCache.java @@ -134,7 +134,7 @@ private Path createMobStoreFile(HColumnDescriptor hcd) int maxKeyCount = keys.length; HRegionInfo regionInfo = new HRegionInfo(tn); StoreFileWriter mobWriter = mobStore.createWriterInTmp(currentDate, - maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey()); + maxKeyCount, hcd.getCompactionCompression(), regionInfo.getStartKey(), false); Path mobFilePath = mobWriter.getPath(); String fileName = mobFilePath.getName(); mobWriter.append(key1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java index f0584d5..1a16a4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java @@ -330,38 +330,6 @@ public void testMinorCompaction() throws Exception { countFiles(tableName, false, family2)); } - private void waitUntilFilesShowup(final TableName table, final String famStr, final int num) - throws InterruptedException, IOException { - - HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0); - - // Make sure that it is flushed. - FileSystem fs = r.getRegionFileSystem().getFileSystem(); - Path path = r.getRegionFileSystem().getStoreDir(famStr); - - - FileStatus[] fileList = fs.listStatus(path); - - while (fileList.length != num) { - Thread.sleep(50); - fileList = fs.listStatus(path); - } - } - - private int numberOfMobFiles(final TableName table, final String famStr) - throws IOException { - - HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(table).get(0); - - // Make sure that it is flushed. - FileSystem fs = r.getRegionFileSystem().getFileSystem(); - Path path = r.getRegionFileSystem().getStoreDir(famStr); - - FileStatus[] fileList = fs.listStatus(path); - - return fileList.length; - } - @Test public void testMinorCompactionWithWeeklyPolicy() throws Exception { resetConf(); @@ -766,20 +734,6 @@ public void preCompactSelection(final ObserverContext