diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java deleted file mode 100644 index 01c195a..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ /dev/null @@ -1,370 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.PrivateCellUtil; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.regionserver.CellSink; -import org.apache.hadoop.hbase.regionserver.HMobStore; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfo; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.ShipperListener; -import org.apache.hadoop.hbase.regionserver.StoreFileScanner; -import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -import org.apache.hadoop.hbase.regionserver.StoreScanner; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; -import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Compact passed set of files in the mob-enabled column family. - */ -@InterfaceAudience.Private -public class DefaultMobStoreCompactor extends DefaultCompactor { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreCompactor.class); - private long mobSizeThreshold; - private HMobStore mobStore; - - private final InternalScannerFactory scannerFactory = new InternalScannerFactory() { - - @Override - public ScanType getScanType(CompactionRequestImpl request) { - // retain the delete markers until they are expired. - return ScanType.COMPACT_RETAIN_DELETES; - } - - @Override - public InternalScanner createScanner(ScanInfo scanInfo, List scanners, - ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { - return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, - fd.earliestPutTs); - } - }; - - private final CellSinkFactory writerFactory = - new CellSinkFactory() { - @Override - public StoreFileWriter createWriter(InternalScanner scanner, - org.apache.hadoop.hbase.regionserver.compactions.Compactor.FileDetails fd, - boolean shouldDropBehind) throws IOException { - // make this writer with tags always because of possible new cells with tags. - return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, true, true, - shouldDropBehind); - } - }; - - public DefaultMobStoreCompactor(Configuration conf, HStore store) { - super(conf, store); - // The mob cells reside in the mob-enabled column family which is held by HMobStore. - // During the compaction, the compactor reads the cells from the mob files and - // probably creates new mob files. All of these operations are included in HMobStore, - // so we need to cast the Store to HMobStore. - if (!(store instanceof HMobStore)) { - throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); - } - mobStore = (HMobStore) store; - mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); - } - - @Override - public List compact(CompactionRequestImpl request, ThroughputController throughputController, - User user) throws IOException { - return compact(request, scannerFactory, writerFactory, throughputController, user); - } - - /** - * Performs compaction on a column family with the mob flag enabled. - * This is for when the mob threshold size has changed or if the mob - * column family mode has been toggled via an alter table statement. - * Compacts the files by the following rules. - * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. - *
    - *
  1. - * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, - * directly copy the (with mob tag) cell into the new store file. - *
  2. - *
  3. - * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into - * the new store file. - *
  4. - *
- * 2. If the Put cell doesn't have a reference tag. - *
    - *
  1. - * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, - * write this cell to a mob file, and write the path of this mob file to the store file. - *
  2. - *
  3. - * Otherwise, directly write this cell into the store file. - *
  4. - *
- * 3. Decide how to write a Delete cell. - *
    - *
  1. - * If a Delete cell does not have a mob reference tag which means this delete marker have not - * been written to the mob del file, write this cell to the mob del file, and write this cell - * with a ref tag to a store file. - *
  2. - *
  3. - * Otherwise, directly write it to a store file. - *
  4. - *
- * After the major compaction on the normal hfiles, we have a guarantee that we have purged all - * deleted or old version mob refs, and the delete markers are written to a del file with the - * suffix _del. Because of this, it is safe to use the del file in the mob compaction. - * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the - * mob files. When the small mob files are merged into bigger ones, the del file is added into - * the scanner to filter the deleted cells. - * @param fd File details - * @param scanner Where to read from. - * @param writer Where to write to. - * @param smallestReadPoint Smallest read point. - * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint - * @param throughputController The compaction throughput controller. - * @param major Is a major compaction. - * @param numofFilesToCompact the number of files to compact - * @return Whether compaction ended; false if it was interrupted for any reason. - */ - @Override - protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, - long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, - boolean major, int numofFilesToCompact) throws IOException { - long bytesWrittenProgressForCloseCheck = 0; - long bytesWrittenProgressForLog = 0; - long bytesWrittenProgressForShippedCall = 0; - // Since scanner.next() can return 'false' but still be delivering data, - // we have to use a do/while loop. - List cells = new ArrayList<>(); - // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME - int closeCheckSizeLimit = HStore.getCloseCheckInterval(); - long lastMillis = 0; - if (LOG.isDebugEnabled()) { - lastMillis = EnvironmentEdgeManager.currentTime(); - } - String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); - long now = 0; - boolean hasMore; - Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); - byte[] fileName = null; - StoreFileWriter mobFileWriter = null, delFileWriter = null; - long mobCells = 0, deleteMarkersCount = 0; - long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; - long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; - boolean finished = false; - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - throughputController.start(compactionName); - KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; - long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); - try { - try { - // If the mob file writer could not be created, directly write the cell to the store file. - mobFileWriter = mobStore.createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, - compactionCompression, store.getRegionInfo().getStartKey(), true); - fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); - } catch (IOException e) { - LOG.warn("Failed to create mob writer, " - + "we will continue the compaction by writing MOB cells directly in store files", e); - } - if (major) { - try { - delFileWriter = mobStore.createDelFileWriterInTmp(new Date(fd.latestPutTs), - fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); - } catch (IOException e) { - LOG.warn( - "Failed to create del writer, " - + "we will continue the compaction by writing delete markers directly in store files", - e); - } - } - do { - hasMore = scanner.next(cells, scannerContext); - if (LOG.isDebugEnabled()) { - now = EnvironmentEdgeManager.currentTime(); - } - for (Cell c : cells) { - if (major && CellUtil.isDelete(c)) { - if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) { - // Directly write it to a store file - writer.append(c); - } else { - // Add a ref tag to this cell and write it to a store file. - writer.append(MobUtils.createMobRefDeleteMarker(c)); - // Write the cell to a del file - delFileWriter.append(c); - deleteMarkersCount++; - } - } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { - // If the mob file writer is null or the kv type is not put, directly write the cell - // to the store file. - writer.append(c); - } else if (MobUtils.isMobReferenceCell(c)) { - if (MobUtils.hasValidMobRefCellValue(c)) { - int size = MobUtils.getMobValueLength(c); - if (size > mobSizeThreshold) { - // If the value size is larger than the threshold, it's regarded as a mob. Since - // its value is already in the mob file, directly write this cell to the store file - writer.append(c); - } else { - // If the value is not larger than the threshold, it's not regarded a mob. Retrieve - // the mob cell from the mob file, and write it back to the store file. - Cell mobCell = mobStore.resolve(c, false); - if (mobCell.getValueLength() != 0) { - // put the mob data back to the store file - PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); - writer.append(mobCell); - cellsCountCompactedFromMob++; - cellsSizeCompactedFromMob += mobCell.getValueLength(); - } else { - // If the value of a file is empty, there might be issues when retrieving, - // directly write the cell to the store file, and leave it to be handled by the - // next compaction. - writer.append(c); - } - } - } else { - LOG.warn("The value format of the KeyValue " + c - + " is wrong, its length is less than " + Bytes.SIZEOF_INT); - writer.append(c); - } - } else if (c.getValueLength() <= mobSizeThreshold) { - //If value size of a cell is not larger than the threshold, directly write to store file - writer.append(c); - } else { - // If the value size of a cell is larger than the threshold, it's regarded as a mob, - // write this cell to a mob file, and write the path to the store file. - mobCells++; - // append the original keyValue in the mob file. - mobFileWriter.append(c); - Cell reference = MobUtils.createMobRefCell(c, fileName, - this.mobStore.getRefCellTags()); - // write the cell whose value is the path of a mob file to the store file. - writer.append(reference); - cellsCountCompactedToMob++; - cellsSizeCompactedToMob += c.getValueLength(); - } - int len = KeyValueUtil.length(c); - ++progress.currentCompactedKVs; - progress.totalCompactedSize += len; - bytesWrittenProgressForShippedCall += len; - if (LOG.isDebugEnabled()) { - bytesWrittenProgressForLog += len; - } - throughputController.control(compactionName, len); - // check periodically to see if a system stop is requested - if (closeCheckSizeLimit > 0) { - bytesWrittenProgressForCloseCheck += len; - if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { - bytesWrittenProgressForCloseCheck = 0; - if (!store.areWritesEnabled()) { - progress.cancel(); - return false; - } - } - } - if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { - ((ShipperListener)writer).beforeShipped(); - kvs.shipped(); - bytesWrittenProgressForShippedCall = 0; - } - } - // Log the progress of long running compactions every minute if - // logging at DEBUG level - if (LOG.isDebugEnabled()) { - if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { - LOG.debug("Compaction progress: " - + compactionName - + " " - + progress - + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) - / ((now - lastMillis) / 1000.0)) + ", throughputController is " - + throughputController); - lastMillis = now; - bytesWrittenProgressForLog = 0; - } - } - cells.clear(); - } while (hasMore); - finished = true; - } catch (InterruptedException e) { - progress.cancel(); - throw new InterruptedIOException( - "Interrupted while control throughput of compacting " + compactionName); - } finally { - throughputController.finish(compactionName); - if (!finished && mobFileWriter != null) { - abortWriter(mobFileWriter); - } - if (!finished && delFileWriter != null) { - abortWriter(delFileWriter); - } - } - if (delFileWriter != null) { - if (deleteMarkersCount > 0) { - // If the del file is not empty, commit it. - // If the commit fails, the compaction is re-performed again. - delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); - delFileWriter.close(); - mobStore.commitFile(delFileWriter.getPath(), path); - } else { - // If the del file is empty, delete it instead of committing. - abortWriter(delFileWriter); - } - } - if (mobFileWriter != null) { - if (mobCells > 0) { - // If the mob file is not empty, commit it. - mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); - mobFileWriter.close(); - mobStore.commitFile(mobFileWriter.getPath(), path); - } else { - // If the mob file is empty, delete it instead of committing. - abortWriter(mobFileWriter); - } - } - mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); - mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob); - mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); - mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); - progress.complete(); - return true; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java deleted file mode 100644 index a932dad..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ /dev/null @@ -1,263 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; -import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.HMobStore; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot; -import org.apache.hadoop.hbase.regionserver.ScannerContext; -import org.apache.hadoop.hbase.regionserver.StoreFileWriter; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; -import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * An implementation of the StoreFlusher. It extends the DefaultStoreFlusher. - * If the store is not a mob store, the flusher flushes the MemStore the same with - * DefaultStoreFlusher, - * If the store is a mob store, the flusher flushes the MemStore into two places. - * One is the store files of HBase, the other is the mob files. - *
    - *
  1. Cells that are not PUT type or have the delete mark will be directly flushed to HBase.
  2. - *
  3. If the size of a cell value is larger than a threshold, it'll be flushed - * to a mob file, another cell with the path of this file will be flushed to HBase.
  4. - *
  5. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to - * HBase directly.
  6. - *
- * - */ -@InterfaceAudience.Private -public class DefaultMobStoreFlusher extends DefaultStoreFlusher { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultMobStoreFlusher.class); - private final Object flushLock = new Object(); - private long mobCellValueSizeThreshold = 0; - private Path targetPath; - private HMobStore mobStore; - - public DefaultMobStoreFlusher(Configuration conf, HStore store) throws IOException { - super(conf, store); - if (!(store instanceof HMobStore)) { - throw new IllegalArgumentException("The store " + store + " is not a HMobStore"); - } - mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); - this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), - store.getColumnFamilyName()); - if (!this.store.getFileSystem().exists(targetPath)) { - this.store.getFileSystem().mkdirs(targetPath); - } - this.mobStore = (HMobStore) store; - } - - /** - * Flushes the snapshot of the MemStore. - * If this store is not a mob store, flush the cells in the snapshot to store files of HBase. - * If the store is a mob one, the flusher flushes the MemStore into two places. - * One is the store files of HBase, the other is the mob files. - *
    - *
  1. Cells that are not PUT type or have the delete mark will be directly flushed to - * HBase.
  2. - *
  3. If the size of a cell value is larger than a threshold, it'll be - * flushed to a mob file, another cell with the path of this file will be flushed to HBase.
  4. - *
  5. If the size of a cell value is smaller than or equal with a threshold, it'll be flushed to - * HBase directly.
  6. - *
- */ - @Override - public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController, - FlushLifeCycleTracker tracker) throws IOException { - ArrayList result = new ArrayList<>(); - long cellsCount = snapshot.getCellsCount(); - if (cellsCount == 0) return result; // don't flush if there are no entries - - // Use a store scanner to find which rows to flush. - long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); - StoreFileWriter writer; - try { - // TODO: We can fail in the below block before we complete adding this flush to - // list of store files. Add cleanup of anything put on filesystem if we fail. - synchronized (flushLock) { - status.setStatus("Flushing " + store + ": creating writer"); - // Write the map out to the disk - writer = store.createWriterInTmp(cellsCount, store.getColumnFamilyDescriptor().getCompressionType(), - false, true, true, false); - IOException e = null; - try { - // It's a mob store, flush the cells in a mob way. This is the difference of flushing - // between a normal and a mob store. - performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); - } catch (IOException ioe) { - e = ioe; - // throw the exception out - throw ioe; - } finally { - if (e != null) { - writer.close(); - } else { - finalizeWriter(writer, cacheFlushId, status); - } - } - } - } finally { - scanner.close(); - } - LOG.info("Mob store is flushed, sequenceid=" + cacheFlushId + ", memsize=" - + StringUtils.TraditionalBinaryPrefix.long2String(snapshot.getDataSize(), "", 1) + - ", hasBloomFilter=" + writer.hasGeneralBloom() + - ", into tmp file " + writer.getPath()); - result.add(writer.getPath()); - return result; - } - - /** - * Flushes the cells in the mob store. - *
    In the mob store, the cells with PUT type might have or have no mob tags. - *
  1. If a cell does not have a mob tag, flushing the cell to different files depends - * on the value length. If the length is larger than a threshold, it's flushed to a - * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly - * flush the cell to a store file in HBase.
  2. - *
  3. If a cell have a mob tag, its value is a mob file name, directly flush it - * to a store file in HBase.
  4. - *
- * @param snapshot Memstore snapshot. - * @param cacheFlushId Log cache flush sequence number. - * @param scanner The scanner of memstore snapshot. - * @param writer The store file writer. - * @param status Task that represents the flush operation and may be updated with status. - * @param throughputController A controller to avoid flush too fast. - * @throws IOException - */ - protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, - InternalScanner scanner, StoreFileWriter writer, MonitoredTask status, - ThroughputController throughputController) throws IOException { - StoreFileWriter mobFileWriter = null; - int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, - HConstants.COMPACTION_KV_MAX_DEFAULT); - long mobCount = 0; - long mobSize = 0; - long time = snapshot.getTimeRangeTracker().getMax(); - mobFileWriter = mobStore.createWriterInTmp(new Date(time), snapshot.getCellsCount(), - store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false); - // the target path is {tableName}/.mob/{cfName}/mobFiles - // the relative path is mobFiles - byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); - ScannerContext scannerContext = - ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); - List cells = new ArrayList<>(); - boolean hasMore; - String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); - boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); - if (control) { - throughputController.start(flushName); - } - IOException ioe = null; - try { - do { - hasMore = scanner.next(cells, scannerContext); - if (!cells.isEmpty()) { - for (Cell c : cells) { - // If we know that this KV is going to be included always, then let us - // set its memstoreTS to 0. This will help us save space when writing to - // disk. - if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c) - || c.getTypeByte() != KeyValue.Type.Put.getCode()) { - writer.append(c); - } else { - // append the original keyValue in the mob file. - mobFileWriter.append(c); - mobSize += c.getValueLength(); - mobCount++; - - // append the tags to the KeyValue. - // The key is same, the value is the filename of the mob file - Cell reference = MobUtils.createMobRefCell(c, fileName, - this.mobStore.getRefCellTags()); - writer.append(reference); - } - int len = KeyValueUtil.length(c); - if (control) { - throughputController.control(flushName, len); - } - } - cells.clear(); - } - } while (hasMore); - } catch (InterruptedException e) { - ioe = new InterruptedIOException( - "Interrupted while control throughput of flushing " + flushName); - throw ioe; - } catch (IOException e) { - ioe = e; - throw e; - } finally { - if (control) { - throughputController.finish(flushName); - } - if (ioe != null) { - mobFileWriter.close(); - } - } - - if (mobCount > 0) { - // commit the mob file from temp folder to target folder. - // If the mob file is committed successfully but the store file is not, - // the committed mob file will be handled by the sweep tool as an unused - // file. - status.setStatus("Flushing mob file " + store + ": appending metadata"); - mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); - status.setStatus("Flushing mob file " + store + ": closing flushed file"); - mobFileWriter.close(); - mobStore.commitFile(mobFileWriter.getPath(), targetPath); - mobStore.updateMobFlushCount(); - mobStore.updateMobFlushedCellsCount(mobCount); - mobStore.updateMobFlushedCellsSize(mobSize); - } else { - try { - status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file"); - mobFileWriter.close(); - // If the mob file is empty, delete it instead of committing. - store.getFileSystem().delete(mobFileWriter.getPath(), true); - } catch (IOException e) { - LOG.error("Failed to delete the temp mob file", e); - } - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java deleted file mode 100644 index ee1fe7d..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/MobStoreEngine.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mob; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.yetus.audience.InterfaceAudience; - -/** - * MobStoreEngine creates the mob specific compactor, and store flusher. - */ -@InterfaceAudience.Private -public class MobStoreEngine extends DefaultStoreEngine { - - @Override - protected void createStoreFlusher(Configuration conf, HStore store) throws IOException { - // When using MOB, we use DefaultMobStoreFlusher always - // Just use the compactor and compaction policy as that in DefaultStoreEngine. We can have MOB - // specific compactor and policy when that is implemented. - storeFlusher = new DefaultMobStoreFlusher(conf, store); - } - - /** - * Creates the DefaultMobCompactor. - */ - @Override - protected void createCompactor(Configuration conf, HStore store) throws IOException { - compactor = new DefaultMobStoreCompactor(conf, store); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index b3f0a44..8131cea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -39,7 +39,7 @@ private static final Logger LOG = LoggerFactory.getLogger(DefaultStoreFlusher.class); private final Object flushLock = new Object(); - public DefaultStoreFlusher(Configuration conf, HStore store) { + public DefaultStoreFlusher(Configuration conf, HStore store) throws IOException { super(conf, store); } @@ -68,7 +68,13 @@ /* shouldDropBehind = */ false); IOException e = null; try { - performFlush(scanner, writer, smallestReadPoint, throughputController); + if(store instanceof HMobStore) { + // It's a mob store, flush the cells in a mob way. This is the difference of flushing + // between a normal and a mob store. + performMobFlush(snapshot, cacheFlushId, scanner, writer, status, throughputController); + } else { + performFlush(scanner, writer, smallestReadPoint, throughputController); + } } catch (IOException ioe) { e = ioe; // throw the exception out diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index d56a1c2..dd5fb74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; -import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HConstants; @@ -51,7 +50,6 @@ import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobFile; import org.apache.hadoop.hbase.mob.MobFileName; -import org.apache.hadoop.hbase.mob.MobStoreEngine; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.HFileArchiveUtil; @@ -157,17 +155,6 @@ } return scan.isReversed() ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt) : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt); - } - - /** - * Creates the mob store engine. - */ - @Override - protected StoreEngine createStoreEngine(HStore store, Configuration conf, - CellComparator cellComparator) throws IOException { - MobStoreEngine engine = new MobStoreEngine(); - engine.createComponents(conf, store, cellComparator); - return engine; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 442d47d..edf05ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -21,17 +21,23 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.OptionalInt; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; /** @@ -40,12 +46,23 @@ */ @InterfaceAudience.Private abstract class StoreFlusher { + private static final Log LOG = LogFactory.getLog(StoreFlusher.class); protected Configuration conf; protected HStore store; + private long mobCellValueSizeThreshold = 0; + private Path targetPath; - public StoreFlusher(Configuration conf, HStore store) { + public StoreFlusher(Configuration conf, HStore store) throws IOException { this.conf = conf; this.store = store; + if(store.getColumnFamilyDescriptor().isMobEnabled()) { + this.mobCellValueSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); + this.targetPath = MobUtils.getMobFamilyPath(conf, store.getTableName(), + store.getColumnFamilyName()); + if (!this.store.getFileSystem().exists(targetPath)) { + this.store.getFileSystem().mkdirs(targetPath); + } + } } /** @@ -149,4 +166,118 @@ } } } + + /** + * Flushes the cells in the mob store. + *
    In the mob store, the cells with PUT type might have or have no mob tags. + *
  1. If a cell does not have a mob tag, flushing the cell to different files depends + * on the value length. If the length is larger than a threshold, it's flushed to a + * mob file and the mob file is flushed to a store file in HBase. Otherwise, directly + * flush the cell to a store file in HBase.
  2. + *
  3. If a cell have a mob tag, its value is a mob file name, directly flush it + * to a store file in HBase.
  4. + *
+ * @param snapshot Memstore snapshot. + * @param cacheFlushId Log cache flush sequence number. + * @param scanner The scanner of memstore snapshot. + * @param writer The store file writer. + * @param status Task that represents the flush operation and may be updated with status. + * @param throughputController A controller to avoid flush too fast. + * @throws IOException + */ + protected void performMobFlush(MemStoreSnapshot snapshot, long cacheFlushId, + InternalScanner scanner, CellSink writer, MonitoredTask status, + ThroughputController throughputController) throws IOException { + StoreFileWriter mobFileWriter = null; + int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, + HConstants.COMPACTION_KV_MAX_DEFAULT); + long mobCount = 0; + long mobSize = 0; + long time = snapshot.getTimeRangeTracker().getMax(); + mobFileWriter = ((HMobStore)store).createWriterInTmp(new Date(time), snapshot.getCellsCount(), + store.getColumnFamilyDescriptor().getCompressionType(), store.getRegionInfo().getStartKey(), false); + // the target path is {tableName}/.mob/{cfName}/mobFiles + // the relative path is mobFiles + byte[] fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + List cells = new ArrayList<>(); + boolean hasMore; + String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush"); + boolean control = throughputController != null && !store.getRegionInfo().getTable().isSystemTable(); + if (control) { + throughputController.start(flushName); + } + IOException ioe = null; + try { + do { + hasMore = scanner.next(cells, scannerContext); + if (!cells.isEmpty()) { + for (Cell c : cells) { + // If we know that this KV is going to be included always, then let us + // set its memstoreTS to 0. This will help us save space when writing to + // disk. + if (c.getValueLength() <= mobCellValueSizeThreshold || MobUtils.isMobReferenceCell(c) + || c.getTypeByte() != KeyValue.Type.Put.getCode()) { + writer.append(c); + } else { + // append the original keyValue in the mob file. + mobFileWriter.append(c); + mobSize += c.getValueLength(); + mobCount++; + + // append the tags to the KeyValue. + // The key is same, the value is the filename of the mob file + Cell reference = MobUtils.createMobRefCell(c, fileName, + ((HMobStore)store).getRefCellTags()); + writer.append(reference); + } + int len = KeyValueUtil.length(c); + if (control) { + throughputController.control(flushName, len); + } + } + cells.clear(); + } + } while (hasMore); + } catch (InterruptedException e) { + ioe = new InterruptedIOException( + "Interrupted while control throughput of flushing " + flushName); + throw ioe; + } catch (IOException e) { + ioe = e; + throw e; + } finally { + if (control) { + throughputController.finish(flushName); + } + if (ioe != null) { + mobFileWriter.close(); + } + } + + if (mobCount > 0) { + // commit the mob file from temp folder to target folder. + // If the mob file is committed successfully but the store file is not, + // the committed mob file will be handled by the sweep tool as an unused + // file. + status.setStatus("Flushing mob file " + store + ": appending metadata"); + mobFileWriter.appendMetadata(cacheFlushId, false, mobCount); + status.setStatus("Flushing mob file " + store + ": closing flushed file"); + mobFileWriter.close(); + ((HMobStore)store).commitFile(mobFileWriter.getPath(), targetPath); + ((HMobStore)store).updateMobFlushCount(); + ((HMobStore)store).updateMobFlushedCellsCount(mobCount); + ((HMobStore)store).updateMobFlushedCellsSize(mobSize); + } else { + try { + status.setStatus("Flushing mob file " + store + ": no mob cells, closing flushed file"); + mobFileWriter.close(); + // If the mob file is empty, delete it instead of committing. + store.getFileSystem().delete(mobFileWriter.getPath(), true); + } catch (IOException e) { + LOG.error("Failed to delete the temp mob file", e); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 595231f..97e09c4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -47,7 +47,7 @@ private final StripeCompactionPolicy.StripeInformationProvider stripes; public StripeStoreFlusher(Configuration conf, HStore store, - StripeCompactionPolicy policy, StripeStoreFileManager stripes) { + StripeCompactionPolicy policy, StripeStoreFileManager stripes) throws IOException { super(conf, store); this.policy = policy; this.stripes = stripes; @@ -77,7 +77,11 @@ mw.init(storeScanner, factory); synchronized (flushLock) { - performFlush(scanner, mw, smallestReadPoint, throughputController); + if(this.store instanceof HMobStore){ + performMobFlush(snapshot, cacheFlushSeqNum, scanner, mw, status, throughputController); + } else { + performFlush(scanner, mw, smallestReadPoint, throughputController); + } result = mw.commitWriters(cacheFlushSeqNum, false); success = true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java index a8ffc2e..7c516b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/AbstractMultiOutputCompactor.java @@ -18,10 +18,7 @@ package org.apache.hadoop.hbase.regionserver.compactions; import java.io.IOException; - import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory; import org.apache.hadoop.hbase.regionserver.HStore; @@ -29,8 +26,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Base class for implementing a Compactor which will generate multiple output files after @@ -39,8 +34,6 @@ @InterfaceAudience.Private public abstract class AbstractMultiOutputCompactor extends Compactor { - - private static final Logger LOG = LoggerFactory.getLogger(AbstractMultiOutputCompactor.class); public AbstractMultiOutputCompactor(Configuration conf, HStore store) { super(conf, store); @@ -60,17 +53,4 @@ writer.init(storeScanner, writerFactory); } - @Override - protected void abortWriter(T writer) throws IOException { - FileSystem fs = store.getFileSystem(); - for (Path leftoverFile : writer.abortWriters()) { - try { - fs.delete(leftoverFile, false); - } catch (IOException e) { - LOG.warn( - "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.", - e); - } - } - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index ed4a025..f4e5747 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -26,20 +26,25 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; +import org.apache.hadoop.hbase.mob.MobUtils; +import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter; import org.apache.hadoop.hbase.regionserver.CellSink; -import org.apache.hadoop.hbase.regionserver.CustomizedScanInfoBuilder; +import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -81,6 +86,7 @@ /** specify how many days to keep MVCC values during major compaction **/ protected int keepSeqIdPeriod; + private long mobSizeThreshold; // Configs that drive whether we drop page cache behind compactions protected static final String MAJOR_COMPACTION_DROP_CACHE = @@ -103,9 +109,10 @@ HConstants.MIN_KEEP_SEQID_PERIOD), HConstants.MIN_KEEP_SEQID_PERIOD); this.dropCacheMajor = conf.getBoolean(MAJOR_COMPACTION_DROP_CACHE, true); this.dropCacheMinor = conf.getBoolean(MINOR_COMPACTION_DROP_CACHE, true); + if (store instanceof HMobStore) { + this.mobSizeThreshold = store.getColumnFamilyDescriptor().getMobThreshold(); + } } - - protected interface CellSinkFactory { S createWriter(InternalScanner scanner, FileDetails fd, boolean shouldDropBehind) @@ -251,6 +258,22 @@ } }; + protected final InternalScannerFactory mobScannerFactory = new InternalScannerFactory() { + + @Override + public ScanType getScanType(CompactionRequestImpl request) { + // retain the delete markers until they are expired. + return ScanType.COMPACT_RETAIN_DELETES; + } + + @Override + public InternalScanner createScanner(ScanInfo scanInfo, List scanners, + ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { + return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, + fd.earliestPutTs); + } + }; + /** * Creates a writer for a new file in a temporary directory. * @param fd The file details. @@ -259,12 +282,14 @@ */ protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException { + boolean includeMVCCReadpoint = (store instanceof HMobStore) ? true : fd.maxMVCCReadpoint > 0; + boolean includesTags = (store instanceof HMobStore) ? true : fd.maxTagsLength > 0; // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. return store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, /* isCompaction = */true, - /* includeMVCCReadpoint = */fd.maxMVCCReadpoint > 0, - /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind); + /* includeMVCCReadpoint = */includeMVCCReadpoint, + /* includesTags = */includesTags, shouldDropBehind); } private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, @@ -326,8 +351,13 @@ cleanSeqId = true; } writer = sinkFactory.createWriter(scanner, fd, dropCache); - finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, - throughputController, request.isAllFiles(), request.getFiles().size()); + if (store instanceof HMobStore) { + finished = performMobCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles(), request.getFiles().size()); + } else { + finished = performCompaction(fd, scanner, writer, smallestReadPoint, cleanSeqId, + throughputController, request.isAllFiles(), request.getFiles().size()); + } if (!finished) { throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); @@ -346,7 +376,35 @@ protected abstract List commitWriter(T writer, FileDetails fd, CompactionRequestImpl request) throws IOException; - protected abstract void abortWriter(T writer) throws IOException; + protected void abortWriter(CellSink writer) throws IOException { + if (writer instanceof StoreFileWriter) { + StoreFileWriter sWriter = (StoreFileWriter) writer; + Path leftoverFile = sWriter.getPath(); + try { + sWriter.close(); + } catch (IOException e) { + LOG.warn("Failed to close the writer after an unfinished compaction.", e); + } + try { + store.getFileSystem().delete(leftoverFile, false); + } catch (IOException e) { + LOG.warn( + "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.", + e); + } + } else if (writer instanceof AbstractMultiFileWriter) { + AbstractMultiFileWriter mWriter = (AbstractMultiFileWriter) writer; + for (Path leftoverFile : mWriter.abortWriters()) { + try { + store.getFileSystem().delete(leftoverFile, false); + } catch (IOException e) { + LOG.warn( + "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.", + e); + } + } + } + } /** * Performs the compaction. @@ -475,6 +533,262 @@ } /** + * Performs compaction on a column family with the mob flag enabled. + * This is for when the mob threshold size has changed or if the mob + * column family mode has been toggled via an alter table statement. + * Compacts the files by the following rules. + * 1. If the Put cell has a mob reference tag, the cell's value is the path of the mob file. + *
    + *
  1. + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * directly copy the (with mob tag) cell into the new store file. + *
  2. + *
  3. + * Otherwise, retrieve the mob cell from the mob file, and writes a copy of the cell into + * the new store file. + *
  4. + *
+ * 2. If the Put cell doesn't have a reference tag. + *
    + *
  1. + * If the value size of a cell is larger than the threshold, this cell is regarded as a mob, + * write this cell to a mob file, and write the path of this mob file to the store file. + *
  2. + *
  3. + * Otherwise, directly write this cell into the store file. + *
  4. + *
+ * 3. Decide how to write a Delete cell. + *
    + *
  1. + * If a Delete cell does not have a mob reference tag which means this delete marker have not + * been written to the mob del file, write this cell to the mob del file, and write this cell + * with a ref tag to a store file. + *
  2. + *
  3. + * Otherwise, directly write it to a store file. + *
  4. + *
+ * After the major compaction on the normal hfiles, we have a guarantee that we have purged all + * deleted or old version mob refs, and the delete markers are written to a del file with the + * suffix _del. Because of this, it is safe to use the del file in the mob compaction. + * The mob compaction doesn't take place in the normal hfiles, it occurs directly in the + * mob files. When the small mob files are merged into bigger ones, the del file is added into + * the scanner to filter the deleted cells. + * @param fd File details + * @param scanner Where to read from. + * @param writer Where to write to. + * @param smallestReadPoint Smallest read point. + * @param cleanSeqId When true, remove seqId(used to be mvcc) value which is <= smallestReadPoint + * @param throughputController The compaction throughput controller. + * @param major Is a major compaction. + * @param numofFilesToCompact the number of files to compact + * @return Whether compaction ended; false if it was interrupted for any reason. + */ + protected boolean performMobCompaction(FileDetails fd, InternalScanner scanner, CellSink writer, + long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController, + boolean major, int numofFilesToCompact) throws IOException { + long bytesWrittenProgressForCloseCheck = 0; + long bytesWrittenProgressForLog = 0; + long bytesWrittenProgressForShippedCall = 0; + // Since scanner.next() can return 'false' but still be delivering data, + // we have to use a do/while loop. + List cells = new ArrayList<>(); + // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME + int closeCheckSizeLimit = HStore.getCloseCheckInterval(); + long lastMillis = 0; + if (LOG.isDebugEnabled()) { + lastMillis = EnvironmentEdgeManager.currentTime(); + } + String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction"); + long now = 0; + boolean hasMore; + Path path = MobUtils.getMobFamilyPath(conf, store.getTableName(), store.getColumnFamilyName()); + byte[] fileName = null; + StoreFileWriter mobFileWriter = null, delFileWriter = null; + long mobCells = 0, deleteMarkersCount = 0; + long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0; + long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0; + boolean finished = false; + ScannerContext scannerContext = + ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); + throughputController.start(compactionName); + KeyValueScanner kvs = (scanner instanceof KeyValueScanner)? (KeyValueScanner)scanner : null; + long shippedCallSizeLimit = (long) numofFilesToCompact * this.store.getColumnFamilyDescriptor().getBlocksize(); + try { + try { + // If the mob file writer could not be created, directly write the cell to the store file. + mobFileWriter = ((HMobStore) store).createWriterInTmp(new Date(fd.latestPutTs), fd.maxKeyCount, + compactionCompression, store.getRegionInfo().getStartKey(), true); + fileName = Bytes.toBytes(mobFileWriter.getPath().getName()); + } catch (IOException e) { + LOG.warn("Failed to create mob writer, " + + "we will continue the compaction by writing MOB cells directly in store files", e); + } + if (major) { + try { + delFileWriter = ((HMobStore) store).createDelFileWriterInTmp(new Date(fd.latestPutTs), + fd.maxKeyCount, compactionCompression, store.getRegionInfo().getStartKey()); + } catch (IOException e) { + LOG.warn( + "Failed to create del writer, " + + "we will continue the compaction by writing delete markers directly in store files", + e); + } + } + do { + hasMore = scanner.next(cells, scannerContext); + if (LOG.isDebugEnabled()) { + now = EnvironmentEdgeManager.currentTime(); + } + for (Cell c : cells) { + if (major && CellUtil.isDelete(c)) { + if (MobUtils.isMobReferenceCell(c) || delFileWriter == null) { + // Directly write it to a store file + writer.append(c); + } else { + // Add a ref tag to this cell and write it to a store file. + writer.append(MobUtils.createMobRefDeleteMarker(c)); + // Write the cell to a del file + delFileWriter.append(c); + deleteMarkersCount++; + } + } else if (mobFileWriter == null || c.getTypeByte() != KeyValue.Type.Put.getCode()) { + // If the mob file writer is null or the kv type is not put, directly write the cell + // to the store file. + writer.append(c); + } else if (MobUtils.isMobReferenceCell(c)) { + if (MobUtils.hasValidMobRefCellValue(c)) { + int size = MobUtils.getMobValueLength(c); + if (size > mobSizeThreshold) { + // If the value size is larger than the threshold, it's regarded as a mob. Since + // its value is already in the mob file, directly write this cell to the store file + writer.append(c); + } else { + // If the value is not larger than the threshold, it's not regarded a mob. Retrieve + // the mob cell from the mob file, and write it back to the store file. + Cell mobCell = ((HMobStore) store).resolve(c, false); + if (mobCell.getValueLength() != 0) { + // put the mob data back to the store file + PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId()); + writer.append(mobCell); + cellsCountCompactedFromMob++; + cellsSizeCompactedFromMob += mobCell.getValueLength(); + } else { + // If the value of a file is empty, there might be issues when retrieving, + // directly write the cell to the store file, and leave it to be handled by the + // next compaction. + writer.append(c); + } + } + } else { + LOG.warn("The value format of the KeyValue " + c + + " is wrong, its length is less than " + Bytes.SIZEOF_INT); + writer.append(c); + } + } else if (c.getValueLength() <= mobSizeThreshold) { + //If value size of a cell is not larger than the threshold, directly write to store file + writer.append(c); + } else { + // If the value size of a cell is larger than the threshold, it's regarded as a mob, + // write this cell to a mob file, and write the path to the store file. + mobCells++; + // append the original keyValue in the mob file. + mobFileWriter.append(c); + Cell reference = MobUtils.createMobRefCell(c, fileName, + ((HMobStore) store).getRefCellTags()); + // write the cell whose value is the path of a mob file to the store file. + writer.append(reference); + cellsCountCompactedToMob++; + cellsSizeCompactedToMob += c.getValueLength(); + } + int len = KeyValueUtil.length(c); + ++progress.currentCompactedKVs; + progress.totalCompactedSize += len; + bytesWrittenProgressForShippedCall += len; + if (LOG.isDebugEnabled()) { + bytesWrittenProgressForLog += len; + } + throughputController.control(compactionName, len); + // check periodically to see if a system stop is requested + if (closeCheckSizeLimit > 0) { + bytesWrittenProgressForCloseCheck += len; + if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) { + bytesWrittenProgressForCloseCheck = 0; + if (!store.areWritesEnabled()) { + progress.cancel(); + return false; + } + } + } + if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + ((ShipperListener)writer).beforeShipped(); + kvs.shipped(); + bytesWrittenProgressForShippedCall = 0; + } + } + // Log the progress of long running compactions every minute if + // logging at DEBUG level + if (LOG.isDebugEnabled()) { + if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) { + LOG.debug("Compaction progress: " + + compactionName + + " " + + progress + + String.format(", rate=%.2f kB/sec", (bytesWrittenProgressForLog / 1024.0) + / ((now - lastMillis) / 1000.0)) + ", throughputController is " + + throughputController); + lastMillis = now; + bytesWrittenProgressForLog = 0; + } + } + cells.clear(); + } while (hasMore); + finished = true; + } catch (InterruptedException e) { + progress.cancel(); + throw new InterruptedIOException( + "Interrupted while control throughput of compacting " + compactionName); + } finally { + throughputController.finish(compactionName); + if (!finished && mobFileWriter != null) { + abortWriter(mobFileWriter); + } + if (!finished && delFileWriter != null) { + abortWriter(delFileWriter); + } + } + if (delFileWriter != null) { + if (deleteMarkersCount > 0) { + // If the del file is not empty, commit it. + // If the commit fails, the compaction is re-performed again. + delFileWriter.appendMetadata(fd.maxSeqId, major, deleteMarkersCount); + delFileWriter.close(); + ((HMobStore) store).commitFile(delFileWriter.getPath(), path); + } else { + // If the del file is empty, delete it instead of committing. + abortWriter(delFileWriter); + } + } + if (mobFileWriter != null) { + if (mobCells > 0) { + // If the mob file is not empty, commit it. + mobFileWriter.appendMetadata(fd.maxSeqId, major, mobCells); + mobFileWriter.close(); + ((HMobStore) store).commitFile(mobFileWriter.getPath(), path); + } else { + // If the mob file is empty, delete it instead of committing. + abortWriter(mobFileWriter); + } + } + ((HMobStore) store).updateCellsCountCompactedFromMob(cellsCountCompactedFromMob); + ((HMobStore) store).updateCellsCountCompactedToMob(cellsCountCompactedToMob); + ((HMobStore) store).updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob); + ((HMobStore) store).updateCellsSizeCompactedToMob(cellsSizeCompactedToMob); + progress.complete(); + return true; + } + /** * @param store store * @param scanners Store file scanners. * @param scanType Scan type. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index 7a398ea..a89088f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -31,8 +32,6 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** @@ -41,7 +40,6 @@ */ @InterfaceAudience.Private public class DefaultCompactor extends Compactor { - private static final Logger LOG = LoggerFactory.getLogger(DefaultCompactor.class); public DefaultCompactor(Configuration conf, HStore store) { super(conf, store); @@ -62,7 +60,11 @@ */ public List compact(final CompactionRequestImpl request, ThroughputController throughputController, User user) throws IOException { - return compact(request, defaultScannerFactory, writerFactory, throughputController, user); + if(store instanceof HMobStore){ + return compact(request, mobScannerFactory, writerFactory, throughputController, user); + } else { + return compact(request, defaultScannerFactory, writerFactory, throughputController, user); + } } /** @@ -91,20 +93,4 @@ return newFiles; } - @Override - protected void abortWriter(StoreFileWriter writer) throws IOException { - Path leftoverFile = writer.getPath(); - try { - writer.close(); - } catch (IOException e) { - LOG.warn("Failed to close the writer after an unfinished compaction.", e); - } - try { - store.getFileSystem().delete(leftoverFile, false); - } catch (IOException e) { - LOG.warn( - "Failed to delete the leftover file " + leftoverFile + " after an unfinished compaction.", - e); - } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java index a1a6022..cf2938a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreEngine.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -34,7 +36,7 @@ @Category({RegionServerTests.class, SmallTests.class}) public class TestDefaultStoreEngine { public static class DummyStoreFlusher extends DefaultStoreFlusher { - public DummyStoreFlusher(Configuration conf, HStore store) { + public DummyStoreFlusher(Configuration conf, HStore store) throws IOException{ super(conf, store); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index ededcf3..861af43 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -644,7 +644,7 @@ // Switch between throw and not throw exception in flush static final AtomicBoolean throwExceptionWhenFlushing = new AtomicBoolean(false); - public CustomStoreFlusher(Configuration conf, HStore store) { + public CustomStoreFlusher(Configuration conf, HStore store) throws IOException { super(conf, store); }