Index: oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java =================================================================== --- oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java (revision 1570529) +++ oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/FileBlobStore.java (working copy) @@ -219,7 +219,7 @@ } @Override - public boolean deleteChunk(String chunkId) throws Exception { + public boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception { byte[] digest = StringUtils.convertHexToBytes(chunkId); File f = getFile(digest, false); if (!f.exists()) { @@ -228,8 +228,11 @@ old.renameTo(f); f = getFile(digest, false); } - f.delete(); - return mark; + if ((maxLastModifiedTime <= 0) + || FileUtils.isFileOlder(f, maxLastModifiedTime)) { + return f.delete(); + } + return false; } @Override @@ -241,8 +244,8 @@ @Override public boolean apply(@Nullable File input) { if (!input.isDirectory() && ( - (maxLastModifiedTime == 0 || maxLastModifiedTime == -1) || - FileUtils.isFileOlder(input, maxLastModifiedTime))) { + (maxLastModifiedTime <= 0) + || FileUtils.isFileOlder(input, maxLastModifiedTime))) { return true; } return false; Index: oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/GarbageCollectableBlobStore.java =================================================================== --- oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/GarbageCollectableBlobStore.java (revision 1570529) +++ oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/GarbageCollectableBlobStore.java (working copy) @@ -86,11 +86,13 @@ * Delete the blob with the given id. * * @param chunkId the chunk id + * @param maxLastModifiedTime + * the max last modified time to consider for retrieval * @return true, if successful * @throws Exception * the exception */ - boolean deleteChunk(String chunkId) throws Exception; + boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception; /** * Resolve chunks from the given Id. Index: oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java =================================================================== --- oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java (revision 1570529) +++ oak-blob/src/main/java/org/apache/jackrabbit/oak/spi/blob/MemoryBlobStore.java (working copy) @@ -80,8 +80,11 @@ return count; } + /** + * Ignores the maxlastModifiedTime + */ @Override - public boolean deleteChunk(String chunkId) throws Exception { + public boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception { BlockId id = new BlockId(StringUtils.convertHexToBytes(chunkId), 0); if (map.containsKey(id)) { map.remove(id); Index: oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStoreTest.java =================================================================== --- oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStoreTest.java (revision 1570529) +++ oak-blob/src/test/java/org/apache/jackrabbit/oak/spi/blob/AbstractBlobStoreTest.java (working copy) @@ -373,7 +373,7 @@ Set ids = createArtifacts(); for (String id : ids) { - store.deleteChunk(id); + store.deleteChunk(id, 0); } Iterator iter = store.getAllChunkIds(0); Index: oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java =================================================================== --- oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java (revision 1570529) +++ oak-commons/src/main/java/org/apache/jackrabbit/oak/commons/sort/ExternalSort.java (working copy) @@ -53,12 +53,16 @@ * You can change the default maximal number of temporary files with the -t flag: java * org/apache/oak/commons/sort/ExternalSort somefile.txt out.txt -t 3 * - * For very large files, you might want to use an appropriate flag to allocate more memory to the - * Java VM: java -Xms2G org/apache/oak/commons/sort/ExternalSort somefile.txt out.txt + * You can change the default maximum memory available with the -m flag: java + * org/apache/oak/commons/sort/ExternalSort somefile.txt out.txt -m 8192 * - * By (in alphabetical order) Philippe Beaudoin, Eleftherios Chetzakis, Jon Elsas, Christan Grant, - * Daniel Haran, Daniel Lemire, Sugumaran Harikrishnan, Jerry Yang, First published: April 2010 - * originally posted at http://lemire.me/blog/archives/2010/04/01/external-memory-sorting-in-java/ + * For very large files, you might want to use an appropriate flag to allocate more memory to + * the Java VM: java -Xms2G org/apache/oak/commons/sort/ExternalSort somefile.txt out.txt + * + * By (in alphabetical order) Philippe Beaudoin, Eleftherios Chetzakis, Jon Elsas, Christan + * Grant, Daniel Haran, Daniel Lemire, Sugumaran Harikrishnan, Jerry Yang, First published: + * April 2010 originally posted at + * http://lemire.me/blog/archives/2010/04/01/external-memory-sorting-in-java/ */ public class ExternalSort { @@ -74,12 +78,17 @@ } static int DEFAULTMAXTEMPFILES = 1024; - + + /** + * Defines the default maximum memory to be used while sorting (8 MB) + */ + static long DEFAULT_MAX_MEM_BYTES = 8388608L; + // we divide the file into small blocks. If the blocks // are too small, we shall create too many temporary files. // If they are too big, we shall be using too much memory. public static long estimateBestSizeOfBlocks(File filetobesorted, - int maxtmpfiles) { + int maxtmpfiles, long maxMemory) { long sizeoffile = filetobesorted.length() * 2; /** * We multiply by two because later on someone insisted on counting the memory usage as 2 @@ -93,11 +102,11 @@ // on the other hand, we don't want to create many temporary // files - // for naught. If blocksize is smaller than half the free - // memory, grow it. - long freemem = Runtime.getRuntime().freeMemory(); - if (blocksize < freemem / 2) { - blocksize = freemem / 2; + // for naught. If blocksize is less than maximum allowed memory, + // scale the blocksize to be equal to the maxMemory parameter + + if (blocksize < maxMemory) { + blocksize = maxMemory; } return blocksize; } @@ -114,7 +123,7 @@ */ public static List sortInBatch(File file) throws IOException { - return sortInBatch(file, defaultcomparator, DEFAULTMAXTEMPFILES, + return sortInBatch(file, defaultcomparator, DEFAULTMAXTEMPFILES, DEFAULT_MAX_MEM_BYTES, Charset.defaultCharset(), null, false); } @@ -130,7 +139,7 @@ */ public static List sortInBatch(File file, Comparator cmp) throws IOException { - return sortInBatch(file, cmp, DEFAULTMAXTEMPFILES, + return sortInBatch(file, cmp, DEFAULTMAXTEMPFILES, DEFAULT_MAX_MEM_BYTES, Charset.defaultCharset(), null, false); } @@ -148,7 +157,7 @@ */ public static List sortInBatch(File file, Comparator cmp, boolean distinct) throws IOException { - return sortInBatch(file, cmp, DEFAULTMAXTEMPFILES, + return sortInBatch(file, cmp, DEFAULTMAXTEMPFILES, DEFAULT_MAX_MEM_BYTES, Charset.defaultCharset(), null, distinct); } @@ -175,14 +184,14 @@ * @return a list of temporary flat files */ public static List sortInBatch(File file, Comparator cmp, - int maxtmpfiles, Charset cs, File tmpdirectory, + int maxtmpfiles, long maxMemory, Charset cs, File tmpdirectory, boolean distinct, int numHeader, boolean usegzip) throws IOException { List files = new ArrayList(); BufferedReader fbr = new BufferedReader(new InputStreamReader( new FileInputStream(file), cs)); - long blocksize = estimateBestSizeOfBlocks(file, maxtmpfiles);// in - // bytes + long blocksize = estimateBestSizeOfBlocks(file, maxtmpfiles, maxMemory);// in + // bytes try { List tmplist = new ArrayList(); @@ -243,9 +252,9 @@ * @return a list of temporary flat files */ public static List sortInBatch(File file, Comparator cmp, - int maxtmpfiles, Charset cs, File tmpdirectory, boolean distinct) + int maxtmpfiles, long maxMemory, Charset cs, File tmpdirectory, boolean distinct) throws IOException { - return sortInBatch(file, cmp, maxtmpfiles, cs, tmpdirectory, + return sortInBatch(file, cmp, maxtmpfiles, maxMemory, cs, tmpdirectory, distinct, 0, false); } @@ -520,6 +529,8 @@ System.out .println("-t or --maxtmpfiles (followed by an integer): specify an upper bound on the number of temporary files"); System.out + .println("-m or --maxmembytes (followed by a long): specify an upper bound on the memory"); + System.out .println("-c or --charset (followed by a charset code): specify the character set to use (for sorting)"); System.out .println("-z or --gzip: use compression for the temporary files"); @@ -534,6 +545,7 @@ boolean verbose = false; boolean distinct = false; int maxtmpfiles = DEFAULTMAXTEMPFILES; + long maxMemory = DEFAULT_MAX_MEM_BYTES; Charset cs = Charset.defaultCharset(); String inputfile = null, outputfile = null; File tempFileStore = null; @@ -559,6 +571,15 @@ System.err .println("maxtmpfiles should be positive"); } + } else if ((args[param].equals("-m") || args[param] + .equals("--maxmembytes")) + && args.length > param + 1) { + param++; + maxMemory = Long.parseLong(args[param]); + if (headersize < 0) { + System.err + .println("maxmembytes should be positive"); + } } else if ((args[param].equals("-c") || args[param] .equals("--charset")) && args.length > param + 1) { @@ -597,7 +618,7 @@ } Comparator comparator = defaultcomparator; List l = sortInBatch(new File(inputfile), comparator, - maxtmpfiles, cs, tempFileStore, distinct, headersize, + maxtmpfiles, maxMemory, cs, tempFileStore, distinct, headersize, usegzip); if (verbose) System.out Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java (revision 1570529) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/cloud/CloudBlobStore.java (working copy) @@ -227,23 +227,27 @@ } @Override - public boolean deleteChunk(String chunkId) throws Exception { + public boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception { Preconditions.checkNotNull(context); final org.jclouds.blobstore.BlobStore blobStore = context.getBlobStore(); - blobStore.removeBlob(cloudContainer, chunkId); - + StorageMetadata metadata = blobStore.blobMetadata(cloudContainer, chunkId); + if ((maxLastModifiedTime <= 0) + || (metadata.getLastModified().getTime() <= maxLastModifiedTime)) { + blobStore.removeBlob(cloudContainer, chunkId); + return true; + } return true; } class CloudStoreIterator implements Iterator { - static final int BATCH = 1000; + private static final int BATCH = 1000; - org.jclouds.blobstore.BlobStore store; - long maxLastModifiedTime; + private org.jclouds.blobstore.BlobStore store; + private long maxLastModifiedTime; - PageSet set; - ArrayDeque queue; + private PageSet set; + private ArrayDeque queue; public CloudStoreIterator(org.jclouds.blobstore.BlobStore store, long maxLastModifiedTime) { @@ -255,7 +259,7 @@ @Override public boolean hasNext() { if ((set == null) || (queue == null)) { - set = store.list(cloudContainer, maxResults(1000)); + set = store.list(cloudContainer, maxResults(BATCH)); loadElements(set); } @@ -278,8 +282,8 @@ Iterator iter = set.iterator(); while (iter.hasNext()) { StorageMetadata metadata = iter.next(); - if ((maxLastModifiedTime == 0 || maxLastModifiedTime == -1) || - (metadata.getLastModified().getTime() <= maxLastModifiedTime)) { + if ((maxLastModifiedTime <= 0) + || (metadata.getLastModified().getTime() <= maxLastModifiedTime)) { queue.add(metadata.getName()); } else { queue.add(metadata.getName()); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (revision 1570529) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/datastore/DataStoreBlobStore.java (working copy) @@ -522,9 +522,17 @@ } @Override - public boolean deleteChunk(String blobId) throws Exception { - ((MultiDataStoreAware) dataStore).deleteRecord(new DataIdentifier(blobId)); - return true; + public boolean deleteChunk(String blobId, long maxLastModifiedTime) throws Exception { + if (dataStore instanceof MultiDataStoreAware) { + DataIdentifier identifier = new DataIdentifier(blobId); + DataRecord dataRecord = dataStore.getRecord(identifier); + if ((maxLastModifiedTime <= 0) + || dataRecord.getLastModified() <= maxLastModifiedTime) { + ((MultiDataStoreAware) dataStore).deleteRecord(identifier); + return true; + } + } + return false; } @Override Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java (revision 1570529) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/db/DbBlobStore.java (working copy) @@ -215,16 +215,27 @@ } @Override - public boolean deleteChunk(String chunkId) throws Exception { + public boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception { Connection conn = cp.getConnection(); try { - PreparedStatement prep = conn - .prepareStatement("delete from datastore_meta where id = ?"); - ;;; - // TODO and lastMod <= ? - ;;; - PreparedStatement prepData = conn - .prepareStatement("delete from datastore_data where id = ?"); + PreparedStatement prep = null; + PreparedStatement prepData = null; + + if (maxLastModifiedTime > 0) { + prep = conn.prepareStatement( + "delete from datastore_meta where id = ? and lastMod <= ?"); + prep.setLong(2, maxLastModifiedTime); + + prepData = conn.prepareStatement( + "delete from datastore_data where id = ? and lastMod <= ?"); + prepData.setLong(2, maxLastModifiedTime); + } else { + prep = conn.prepareStatement( + "delete from datastore_meta where id = ?"); + + prepData = conn.prepareStatement( + "delete from datastore_data where id = ?"); + } prep.setString(1, chunkId); prep.execute(); prepData.setString(1, chunkId); @@ -245,7 +256,7 @@ final Connection conn = cp.getConnection(); PreparedStatement prep = null; - if ((maxLastModifiedTime != 0) && (maxLastModifiedTime != -1)) { + if (maxLastModifiedTime > 0) { prep = conn.prepareStatement( "select id from datastore_meta where lastMod <= ?"); prep.setLong(1, maxLastModifiedTime); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (revision 1570529) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/blob/MarkSweepGarbageCollector.java (working copy) @@ -70,17 +70,20 @@ public static final int DEFAULT_BATCH_COUNT = 2048; + /** The max last modified time of blobs to consider for garbage collection. */ + private long maxLastModifiedTime; + /** Run concurrently when possible. */ - boolean runConcurrently = true; + private boolean runConcurrently = true; /** The number of sweeper threads to use. */ - int numSweepers = 1; + private int numSweepers = 1; /** The node store. */ - DocumentNodeStore nodeStore; + private DocumentNodeStore nodeStore; /** The garbage collector file state */ - GarbageCollectorFileState fs; + private GarbageCollectorFileState fs; /** The configured root to store gc process files. */ private String root = TEMP_DIR; @@ -89,6 +92,24 @@ private int batchCount = DEFAULT_BATCH_COUNT; /** + * Gets the max last modified time considered for garbage collection. + * + * @return the max last modified time + */ + protected long getMaxLastModifiedTime() { + return maxLastModifiedTime; + } + + /** + * Sets the max last modified time considered for garbage collection. + * + * @param maxLastModifiedTime the new max last modified time + */ + protected void setMaxLastModifiedTime(long maxLastModifiedTime) { + this.maxLastModifiedTime = maxLastModifiedTime; + } + + /** * Gets the root. * * @return the root @@ -125,20 +146,13 @@ } /** - * Instantiates a new blob garbage collector. - * - * @param nodeStore - * the node store - * @param root - * the root - * @param batchCount - * the batch count - * @param runBackendConcurrently - * - run the backend iterate concurrently - * @param maxSweeperThreads - * the max sweeper threads - * @throws IOException - * Signals that an I/O exception has occurred. + * @param nodeStore the node store + * @param root the root + * @param batchCount the batch count + * @param runBackendConcurrently - run the backend iterate concurrently + * @param maxSweeperThreads the max sweeper threads + * @param maxLastModifiedTime the max last modified time + * @throws IOException Signals that an I/O exception has occurred. */ public void init( NodeStore nodeStore, @@ -145,12 +159,14 @@ String root, int batchCount, boolean runBackendConcurrently, - int maxSweeperThreads) + int maxSweeperThreads, + long maxLastModifiedTime) throws IOException { this.batchCount = batchCount; this.root = root; this.runConcurrently = runBackendConcurrently; this.numSweepers = maxSweeperThreads; + this.maxLastModifiedTime = maxLastModifiedTime; init(nodeStore); } @@ -356,10 +372,10 @@ class Sweeper implements Runnable { /** The exception queue. */ - ConcurrentLinkedQueue exceptionQueue; + private ConcurrentLinkedQueue exceptionQueue; /** The ids to sweep. */ - List ids; + private List ids; /** * Instantiates a new sweeper. @@ -379,7 +395,7 @@ for (String id : ids) { try { boolean deleted = ((GarbageCollectableBlobStore) nodeStore.getBlobStore()) - .deleteChunk(id); + .deleteChunk(id, maxLastModifiedTime); if (!deleted) { exceptionQueue.add(id); } @@ -394,8 +410,6 @@ /** * Iterates the complete node tree. * - * @param writer - * the writer * @return the list * @throws Exception * the exception @@ -446,9 +460,6 @@ * BlobIdRetriever class to retrieve all blob ids. */ class BlobIdRetriever implements Runnable { - - boolean finished; - @Override public void run() { retrieve(); @@ -465,7 +476,7 @@ bufferWriter = new BufferedWriter( new FileWriter(fs.getAvailableRefs())); Iterator idsIter = ((GarbageCollectableBlobStore) nodeStore.getBlobStore()) - .getAllChunkIds(0); + .getAllChunkIds(maxLastModifiedTime); List ids = Lists.newArrayList(); int blobsCount = 0; while (idsIter.hasNext()) { @@ -483,8 +494,6 @@ // sort the file fs.sort(fs.getAvailableRefs()); - - finished = true; LOG.debug("Ending retrieve of all blobs : " + blobsCount); } catch (Exception e) { e.printStackTrace(); Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (revision 1570529) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoBlobStore.java (working copy) @@ -192,12 +192,10 @@ } @Override - public boolean deleteChunk(String chunkId) throws Exception { + public boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception { DBCollection collection = getBlobCollection(); - BasicDBObject removeObj = new BasicDBObject(); - removeObj.append(MongoBlob.KEY_ID, chunkId); + WriteResult result = collection.remove(getBlobQuery(chunkId, maxLastModifiedTime)); - WriteResult result = collection.remove(removeObj); if (result.getN() == 1) { return true; } Index: oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java =================================================================== --- oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java (revision 1570529) +++ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/rdb/RDBBlobStore.java (working copy) @@ -286,12 +286,25 @@ } @Override - public boolean deleteChunk(String chunkId) throws Exception { + public boolean deleteChunk(String chunkId, long maxLastModifiedTime) throws Exception { try { - PreparedStatement prep = connection.prepareStatement( - "delete from datastore_meta where id = ?"); - PreparedStatement prepData = connection.prepareStatement( - "delete from datastore_data where id = ?"); + PreparedStatement prep = null; + PreparedStatement prepData = null; + + if (maxLastModifiedTime > 0) { + prep = connection.prepareStatement( + "delete from datastore_meta where id = ? and lastMod <= ?"); + prep.setLong(2, maxLastModifiedTime); + + prepData = connection.prepareStatement( + "delete from datastore_data where id = ? and lastMod <= ?"); + prepData.setLong(2, maxLastModifiedTime); + } else { + prep = connection.prepareStatement( + "delete from datastore_meta where id = ?"); + prepData = connection.prepareStatement( + "delete from datastore_data where id = ?"); + } prep.setString(1, chunkId); prep.execute(); prepData.setString(1, chunkId); @@ -301,7 +314,7 @@ prepData.close(); } finally { connection.commit(); -} + } return true; } @@ -310,7 +323,7 @@ public Iterator getAllChunkIds(long maxLastModifiedTime) throws Exception { PreparedStatement prep = null; - if ((maxLastModifiedTime != 0) && (maxLastModifiedTime != -1)) { + if (maxLastModifiedTime > 0) { prep = connection.prepareStatement( "select id from datastore_meta where lastMod <= ?"); prep.setLong(1, maxLastModifiedTime); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/AbstractBlobStoreTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/AbstractBlobStoreTest.java (revision 1570529) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/blob/AbstractBlobStoreTest.java (working copy) @@ -383,7 +383,7 @@ Set ids = createArtifacts(); for (String id : ids) { - store.deleteChunk(id); + store.deleteChunk(id, 0); } Iterator iter = store.getAllChunkIds(0); Index: oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java =================================================================== --- oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (revision 1570529) +++ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/MongoBlobGCTest.java (working copy) @@ -94,7 +94,7 @@ DocumentNodeStore s = mk.getNodeStore(); MarkSweepGarbageCollector gc = new MarkSweepGarbageCollector(); - gc.init(s, "./target", 2048, true, 2); + gc.init(s, "./target", 2048, true, 2, 0); gc.collectGarbage(); Set existing = iterate();